2 ceph manager -- Thrasher and CephManager objects
4 from cStringIO
import StringIO
5 from functools
import wraps
17 from teuthology
import misc
as teuthology
18 from tasks
.scrub
import Scrubber
19 from util
.rados
import cmd_erasure_code_profile
20 from util
import get_remote
21 from teuthology
.contextutil
import safe_while
22 from teuthology
.orchestra
.remote
import Remote
23 from teuthology
.orchestra
import run
24 from teuthology
.exceptions
import CommandFailedError
27 from subprocess
import DEVNULL
# py3k
29 DEVNULL
= open(os
.devnull
, 'r+')
31 DEFAULT_CONF_PATH
= '/etc/ceph/ceph.conf'
33 log
= logging
.getLogger(__name__
)
36 def write_conf(ctx
, conf_path
=DEFAULT_CONF_PATH
, cluster
='ceph'):
38 ctx
.ceph
[cluster
].conf
.write(conf_fp
)
40 writes
= ctx
.cluster
.run(
42 'sudo', 'mkdir', '-p', '/etc/ceph', run
.Raw('&&'),
43 'sudo', 'chmod', '0755', '/etc/ceph', run
.Raw('&&'),
46 ('import shutil, sys; '
47 'shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))'),
50 'sudo', 'chmod', '0644', conf_path
,
54 teuthology
.feed_many_stdins_and_close(conf_fp
, writes
)
58 def mount_osd_data(ctx
, remote
, cluster
, osd
):
63 :param remote: Remote site
64 :param cluster: name of ceph cluster
67 log
.debug('Mounting data for osd.{o} on {r}'.format(o
=osd
, r
=remote
))
68 role
= "{0}.osd.{1}".format(cluster
, osd
)
69 alt_role
= role
if cluster
!= 'ceph' else "osd.{0}".format(osd
)
70 if remote
in ctx
.disk_config
.remote_to_roles_to_dev
:
71 if alt_role
in ctx
.disk_config
.remote_to_roles_to_dev
[remote
]:
73 if role
not in ctx
.disk_config
.remote_to_roles_to_dev
[remote
]:
75 dev
= ctx
.disk_config
.remote_to_roles_to_dev
[remote
][role
]
76 mount_options
= ctx
.disk_config
.\
77 remote_to_roles_to_dev_mount_options
[remote
][role
]
78 fstype
= ctx
.disk_config
.remote_to_roles_to_dev_fstype
[remote
][role
]
79 mnt
= os
.path
.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster
, osd
))
81 log
.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
82 'mountpoint: {p}, type: {t}, options: {v}'.format(
83 o
=osd
, n
=remote
.name
, p
=mnt
, t
=fstype
, v
=mount_options
,
91 '-o', ','.join(mount_options
),
100 Object used to thrash Ceph
102 def __init__(self
, manager
, config
, logger
=None):
103 self
.ceph_manager
= manager
104 self
.cluster
= manager
.cluster
105 self
.ceph_manager
.wait_for_clean()
106 osd_status
= self
.ceph_manager
.get_osd_status()
107 self
.in_osds
= osd_status
['in']
108 self
.live_osds
= osd_status
['live']
109 self
.out_osds
= osd_status
['out']
110 self
.dead_osds
= osd_status
['dead']
111 self
.stopping
= False
114 self
.revive_timeout
= self
.config
.get("revive_timeout", 150)
115 self
.pools_to_fix_pgp_num
= set()
116 if self
.config
.get('powercycle'):
117 self
.revive_timeout
+= 120
118 self
.clean_wait
= self
.config
.get('clean_wait', 0)
119 self
.minin
= self
.config
.get("min_in", 3)
120 self
.chance_move_pg
= self
.config
.get('chance_move_pg', 1.0)
121 self
.sighup_delay
= self
.config
.get('sighup_delay')
122 self
.optrack_toggle_delay
= self
.config
.get('optrack_toggle_delay')
123 self
.dump_ops_enable
= self
.config
.get('dump_ops_enable')
124 self
.noscrub_toggle_delay
= self
.config
.get('noscrub_toggle_delay')
125 self
.chance_thrash_cluster_full
= self
.config
.get('chance_thrash_cluster_full', .05)
126 self
.chance_thrash_pg_upmap
= self
.config
.get('chance_thrash_pg_upmap', 1.0)
127 self
.chance_thrash_pg_upmap_items
= self
.config
.get('chance_thrash_pg_upmap', 1.0)
129 num_osds
= self
.in_osds
+ self
.out_osds
130 self
.max_pgs
= self
.config
.get("max_pgs_per_pool_osd", 1200) * num_osds
131 if self
.logger
is not None:
132 self
.log
= lambda x
: self
.logger
.info(x
)
136 Implement log behavior
140 if self
.config
is None:
142 # prevent monitor from auto-marking things out while thrasher runs
143 # try both old and new tell syntax, in case we are testing old code
144 self
.saved_options
= []
145 # assuming that the default settings do not vary from one daemon to
147 first_mon
= teuthology
.get_first_mon(manager
.ctx
, self
.config
).split('.')
148 opts
= [('mon', 'mon_osd_down_out_interval', 0)]
149 for service
, opt
, new_value
in opts
:
150 old_value
= manager
.get_config(first_mon
[0],
153 self
.saved_options
.append((service
, opt
, old_value
))
154 self
._set
_config
(service
, '*', opt
, new_value
)
155 # initialize ceph_objectstore_tool property - must be done before
156 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
157 if (self
.config
.get('powercycle') or
158 not self
.cmd_exists_on_osds("ceph-objectstore-tool") or
159 self
.config
.get('disable_objectstore_tool_tests', False)):
160 self
.ceph_objectstore_tool
= False
161 self
.test_rm_past_intervals
= False
162 if self
.config
.get('powercycle'):
163 self
.log("Unable to test ceph-objectstore-tool, "
164 "powercycle testing")
166 self
.log("Unable to test ceph-objectstore-tool, "
167 "not available on all OSD nodes")
169 self
.ceph_objectstore_tool
= \
170 self
.config
.get('ceph_objectstore_tool', True)
171 self
.test_rm_past_intervals
= \
172 self
.config
.get('test_rm_past_intervals', True)
174 self
.thread
= gevent
.spawn(self
.do_thrash
)
175 if self
.sighup_delay
:
176 self
.sighup_thread
= gevent
.spawn(self
.do_sighup
)
177 if self
.optrack_toggle_delay
:
178 self
.optrack_toggle_thread
= gevent
.spawn(self
.do_optrack_toggle
)
179 if self
.dump_ops_enable
== "true":
180 self
.dump_ops_thread
= gevent
.spawn(self
.do_dump_ops
)
181 if self
.noscrub_toggle_delay
:
182 self
.noscrub_toggle_thread
= gevent
.spawn(self
.do_noscrub_toggle
)
184 def _set_config(self
, service_type
, service_id
, name
, value
):
185 opt_arg
= '--{name} {value}'.format(name
=name
, value
=value
)
187 whom
= '.'.join([service_type
, service_id
])
188 self
.ceph_manager
.raw_cluster_cmd('--', 'tell', whom
,
189 'injectargs', opt_arg
)
191 self
.ceph_manager
.raw_cluster_cmd('--', service_type
,
193 'injectargs', opt_arg
)
196 def cmd_exists_on_osds(self
, cmd
):
197 allremotes
= self
.ceph_manager
.ctx
.cluster
.only(\
198 teuthology
.is_type('osd', self
.cluster
)).remotes
.keys()
199 allremotes
= list(set(allremotes
))
200 for remote
in allremotes
:
201 proc
= remote
.run(args
=['type', cmd
], wait
=True,
202 check_status
=False, stdout
=StringIO(),
204 if proc
.exitstatus
!= 0:
208 def kill_osd(self
, osd
=None, mark_down
=False, mark_out
=False):
210 :param osd: Osd to be killed.
211 :mark_down: Mark down if true.
212 :mark_out: Mark out if true.
215 osd
= random
.choice(self
.live_osds
)
216 self
.log("Killing osd %s, live_osds are %s" % (str(osd
),
217 str(self
.live_osds
)))
218 self
.live_osds
.remove(osd
)
219 self
.dead_osds
.append(osd
)
220 self
.ceph_manager
.kill_osd(osd
)
222 self
.ceph_manager
.mark_down_osd(osd
)
223 if mark_out
and osd
in self
.in_osds
:
225 if self
.ceph_objectstore_tool
:
226 self
.log("Testing ceph-objectstore-tool on down osd")
227 remote
= self
.ceph_manager
.find_remote('osd', osd
)
228 FSPATH
= self
.ceph_manager
.get_filepath()
229 JPATH
= os
.path
.join(FSPATH
, "journal")
230 exp_osd
= imp_osd
= osd
231 exp_remote
= imp_remote
= remote
232 # If an older osd is available we'll move a pg from there
233 if (len(self
.dead_osds
) > 1 and
234 random
.random() < self
.chance_move_pg
):
235 exp_osd
= random
.choice(self
.dead_osds
[:-1])
236 exp_remote
= self
.ceph_manager
.find_remote('osd', exp_osd
)
237 if ('keyvaluestore_backend' in
238 self
.ceph_manager
.ctx
.ceph
[self
.cluster
].conf
['osd']):
239 prefix
= ("sudo adjust-ulimits ceph-objectstore-tool "
240 "--data-path {fpath} --journal-path {jpath} "
241 "--type keyvaluestore "
243 "/var/log/ceph/objectstore_tool.\\$pid.log ".
244 format(fpath
=FSPATH
, jpath
=JPATH
))
246 prefix
= ("sudo adjust-ulimits ceph-objectstore-tool "
247 "--data-path {fpath} --journal-path {jpath} "
249 "/var/log/ceph/objectstore_tool.\\$pid.log ".
250 format(fpath
=FSPATH
, jpath
=JPATH
))
251 cmd
= (prefix
+ "--op list-pgs").format(id=exp_osd
)
253 # ceph-objectstore-tool might be temporarily absent during an
254 # upgrade - see http://tracker.ceph.com/issues/18014
255 with
safe_while(sleep
=15, tries
=40, action
="type ceph-objectstore-tool") as proceed
:
257 proc
= exp_remote
.run(args
=['type', 'ceph-objectstore-tool'],
258 wait
=True, check_status
=False, stdout
=StringIO(),
260 if proc
.exitstatus
== 0:
262 log
.debug("ceph-objectstore-tool binary not present, trying again")
264 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
265 # see http://tracker.ceph.com/issues/19556
266 with
safe_while(sleep
=15, tries
=40, action
="ceph-objectstore-tool --op list-pgs") as proceed
:
268 proc
= exp_remote
.run(args
=cmd
, wait
=True,
270 stdout
=StringIO(), stderr
=StringIO())
271 if proc
.exitstatus
== 0:
273 elif proc
.exitstatus
== 1 and proc
.stderr
== "OSD has the store locked":
276 raise Exception("ceph-objectstore-tool: "
277 "exp list-pgs failure with status {ret}".
278 format(ret
=proc
.exitstatus
))
280 pgs
= proc
.stdout
.getvalue().split('\n')[:-1]
282 self
.log("No PGs found for osd.{osd}".format(osd
=exp_osd
))
284 pg
= random
.choice(pgs
)
285 exp_path
= teuthology
.get_testdir(self
.ceph_manager
.ctx
)
286 exp_path
= os
.path
.join(exp_path
, '{0}.data'.format(self
.cluster
))
287 exp_path
= os
.path
.join(exp_path
,
288 "exp.{pg}.{id}".format(
292 cmd
= prefix
+ "--op export --pgid {pg} --file {file}"
293 cmd
= cmd
.format(id=exp_osd
, pg
=pg
, file=exp_path
)
294 proc
= exp_remote
.run(args
=cmd
)
296 raise Exception("ceph-objectstore-tool: "
297 "export failure with status {ret}".
298 format(ret
=proc
.exitstatus
))
300 cmd
= prefix
+ "--op remove --pgid {pg}"
301 cmd
= cmd
.format(id=exp_osd
, pg
=pg
)
302 proc
= exp_remote
.run(args
=cmd
)
304 raise Exception("ceph-objectstore-tool: "
305 "remove failure with status {ret}".
306 format(ret
=proc
.exitstatus
))
307 # If there are at least 2 dead osds we might move the pg
308 if exp_osd
!= imp_osd
:
309 # If pg isn't already on this osd, then we will move it there
310 cmd
= (prefix
+ "--op list-pgs").format(id=imp_osd
)
311 proc
= imp_remote
.run(args
=cmd
, wait
=True,
312 check_status
=False, stdout
=StringIO())
314 raise Exception("ceph-objectstore-tool: "
315 "imp list-pgs failure with status {ret}".
316 format(ret
=proc
.exitstatus
))
317 pgs
= proc
.stdout
.getvalue().split('\n')[:-1]
319 self
.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
320 format(pg
=pg
, fosd
=exp_osd
, tosd
=imp_osd
))
321 if imp_remote
!= exp_remote
:
322 # Copy export file to the other machine
323 self
.log("Transfer export file from {srem} to {trem}".
324 format(srem
=exp_remote
, trem
=imp_remote
))
325 tmpexport
= Remote
.get_file(exp_remote
, exp_path
)
326 Remote
.put_file(imp_remote
, tmpexport
, exp_path
)
329 # Can't move the pg after all
331 imp_remote
= exp_remote
333 cmd
= (prefix
+ "--op import --file {file}")
334 cmd
= cmd
.format(id=imp_osd
, file=exp_path
)
335 proc
= imp_remote
.run(args
=cmd
, wait
=True, check_status
=False,
337 if proc
.exitstatus
== 1:
338 bogosity
= "The OSD you are using is older than the exported PG"
339 if bogosity
in proc
.stderr
.getvalue():
340 self
.log("OSD older than exported PG"
342 elif proc
.exitstatus
== 10:
343 self
.log("Pool went away before processing an import"
345 elif proc
.exitstatus
== 11:
346 self
.log("Attempt to import an incompatible export"
348 elif proc
.exitstatus
:
349 raise Exception("ceph-objectstore-tool: "
350 "import failure with status {ret}".
351 format(ret
=proc
.exitstatus
))
352 cmd
= "rm -f {file}".format(file=exp_path
)
353 exp_remote
.run(args
=cmd
)
354 if imp_remote
!= exp_remote
:
355 imp_remote
.run(args
=cmd
)
357 # apply low split settings to each pool
358 for pool
in self
.ceph_manager
.list_pools():
359 no_sudo_prefix
= prefix
[5:]
360 cmd
= ("CEPH_ARGS='--filestore-merge-threshold 1 "
361 "--filestore-split-multiple 1' sudo -E "
362 + no_sudo_prefix
+ "--op apply-layout-settings --pool " + pool
).format(id=osd
)
363 proc
= remote
.run(args
=cmd
, wait
=True, check_status
=False, stderr
=StringIO())
364 output
= proc
.stderr
.getvalue()
365 if 'Couldn\'t find pool' in output
:
368 raise Exception("ceph-objectstore-tool apply-layout-settings"
369 " failed with {status}".format(status
=proc
.exitstatus
))
371 def rm_past_intervals(self
, osd
=None):
373 :param osd: Osd to find pg to remove past intervals
375 if self
.test_rm_past_intervals
:
377 osd
= random
.choice(self
.dead_osds
)
378 self
.log("Use ceph_objectstore_tool to remove past intervals")
379 remote
= self
.ceph_manager
.find_remote('osd', osd
)
380 FSPATH
= self
.ceph_manager
.get_filepath()
381 JPATH
= os
.path
.join(FSPATH
, "journal")
382 if ('keyvaluestore_backend' in
383 self
.ceph_manager
.ctx
.ceph
[self
.cluster
].conf
['osd']):
384 prefix
= ("sudo adjust-ulimits ceph-objectstore-tool "
385 "--data-path {fpath} --journal-path {jpath} "
386 "--type keyvaluestore "
388 "/var/log/ceph/objectstore_tool.\\$pid.log ".
389 format(fpath
=FSPATH
, jpath
=JPATH
))
391 prefix
= ("sudo adjust-ulimits ceph-objectstore-tool "
392 "--data-path {fpath} --journal-path {jpath} "
394 "/var/log/ceph/objectstore_tool.\\$pid.log ".
395 format(fpath
=FSPATH
, jpath
=JPATH
))
396 cmd
= (prefix
+ "--op list-pgs").format(id=osd
)
397 proc
= remote
.run(args
=cmd
, wait
=True,
398 check_status
=False, stdout
=StringIO())
400 raise Exception("ceph_objectstore_tool: "
401 "exp list-pgs failure with status {ret}".
402 format(ret
=proc
.exitstatus
))
403 pgs
= proc
.stdout
.getvalue().split('\n')[:-1]
405 self
.log("No PGs found for osd.{osd}".format(osd
=osd
))
407 pg
= random
.choice(pgs
)
408 cmd
= (prefix
+ "--op rm-past-intervals --pgid {pg}").\
409 format(id=osd
, pg
=pg
)
410 proc
= remote
.run(args
=cmd
)
412 raise Exception("ceph_objectstore_tool: "
413 "rm-past-intervals failure with status {ret}".
414 format(ret
=proc
.exitstatus
))
416 def blackhole_kill_osd(self
, osd
=None):
418 If all else fails, kill the osd.
419 :param osd: Osd to be killed.
422 osd
= random
.choice(self
.live_osds
)
423 self
.log("Blackholing and then killing osd %s, live_osds are %s" %
424 (str(osd
), str(self
.live_osds
)))
425 self
.live_osds
.remove(osd
)
426 self
.dead_osds
.append(osd
)
427 self
.ceph_manager
.blackhole_kill_osd(osd
)
429 def revive_osd(self
, osd
=None, skip_admin_check
=False):
432 :param osd: Osd to be revived.
435 osd
= random
.choice(self
.dead_osds
)
436 self
.log("Reviving osd %s" % (str(osd
),))
437 self
.ceph_manager
.revive_osd(
440 skip_admin_check
=skip_admin_check
)
441 self
.dead_osds
.remove(osd
)
442 self
.live_osds
.append(osd
)
444 def out_osd(self
, osd
=None):
447 :param osd: Osd to be marked.
450 osd
= random
.choice(self
.in_osds
)
451 self
.log("Removing osd %s, in_osds are: %s" %
452 (str(osd
), str(self
.in_osds
)))
453 self
.ceph_manager
.mark_out_osd(osd
)
454 self
.in_osds
.remove(osd
)
455 self
.out_osds
.append(osd
)
457 def in_osd(self
, osd
=None):
460 :param osd: Osd to be marked.
463 osd
= random
.choice(self
.out_osds
)
464 if osd
in self
.dead_osds
:
465 return self
.revive_osd(osd
)
466 self
.log("Adding osd %s" % (str(osd
),))
467 self
.out_osds
.remove(osd
)
468 self
.in_osds
.append(osd
)
469 self
.ceph_manager
.mark_in_osd(osd
)
470 self
.log("Added osd %s" % (str(osd
),))
472 def reweight_osd_or_by_util(self
, osd
=None):
474 Reweight an osd that is in
475 :param osd: Osd to be marked.
477 if osd
is not None or random
.choice([True, False]):
479 osd
= random
.choice(self
.in_osds
)
480 val
= random
.uniform(.1, 1.0)
481 self
.log("Reweighting osd %s to %s" % (str(osd
), str(val
)))
482 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
485 # do it several times, the option space is large
488 'max_change': random
.choice(['0.05', '1.0', '3.0']),
489 'overage': random
.choice(['110', '1000']),
490 'type': random
.choice([
491 'reweight-by-utilization',
492 'test-reweight-by-utilization']),
494 self
.log("Reweighting by: %s"%(str(options
),))
495 self
.ceph_manager
.raw_cluster_cmd(
499 options
['max_change'])
501 def primary_affinity(self
, osd
=None):
503 osd
= random
.choice(self
.in_osds
)
504 if random
.random() >= .5:
506 elif random
.random() >= .5:
510 self
.log('Setting osd %s primary_affinity to %f' % (str(osd
), pa
))
511 self
.ceph_manager
.raw_cluster_cmd('osd', 'primary-affinity',
514 def thrash_cluster_full(self
):
516 Set and unset cluster full condition
518 self
.log('Setting full ratio to .001')
519 self
.ceph_manager
.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
521 self
.log('Setting full ratio back to .95')
522 self
.ceph_manager
.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
524 def thrash_pg_upmap(self
):
526 Install or remove random pg_upmap entries in OSDMap
528 from random
import shuffle
529 out
= self
.ceph_manager
.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
531 self
.log('j is %s' % j
)
533 if random
.random() >= .3:
534 pgs
= self
.ceph_manager
.get_pg_stats()
535 pg
= random
.choice(pgs
)
536 pgid
= str(pg
['pgid'])
537 poolid
= int(pgid
.split('.')[0])
538 sizes
= [x
['size'] for x
in j
['pools'] if x
['pool'] == poolid
]
542 osds
= self
.in_osds
+ self
.out_osds
545 self
.log('Setting %s to %s' % (pgid
, osds
))
546 cmd
= ['osd', 'pg-upmap', pgid
] + [str(x
) for x
in osds
]
547 self
.log('cmd %s' % cmd
)
548 self
.ceph_manager
.raw_cluster_cmd(*cmd
)
554 self
.log('Clearing pg_upmap on %s' % pg
)
555 self
.ceph_manager
.raw_cluster_cmd(
560 self
.log('No pg_upmap entries; doing nothing')
561 except CommandFailedError
:
562 self
.log('Failed to rm-pg-upmap, ignoring')
564 def thrash_pg_upmap_items(self
):
566 Install or remove random pg_upmap_items entries in OSDMap
568 from random
import shuffle
569 out
= self
.ceph_manager
.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
571 self
.log('j is %s' % j
)
573 if random
.random() >= .3:
574 pgs
= self
.ceph_manager
.get_pg_stats()
575 pg
= random
.choice(pgs
)
576 pgid
= str(pg
['pgid'])
577 poolid
= int(pgid
.split('.')[0])
578 sizes
= [x
['size'] for x
in j
['pools'] if x
['pool'] == poolid
]
582 osds
= self
.in_osds
+ self
.out_osds
585 self
.log('Setting %s to %s' % (pgid
, osds
))
586 cmd
= ['osd', 'pg-upmap-items', pgid
] + [str(x
) for x
in osds
]
587 self
.log('cmd %s' % cmd
)
588 self
.ceph_manager
.raw_cluster_cmd(*cmd
)
590 m
= j
['pg_upmap_items']
594 self
.log('Clearing pg_upmap on %s' % pg
)
595 self
.ceph_manager
.raw_cluster_cmd(
600 self
.log('No pg_upmap entries; doing nothing')
601 except CommandFailedError
:
602 self
.log('Failed to rm-pg-upmap-items, ignoring')
606 Make sure all osds are up and not out.
608 while len(self
.dead_osds
) > 0:
609 self
.log("reviving osd")
611 while len(self
.out_osds
) > 0:
612 self
.log("inning osd")
617 Break out of this Ceph loop
621 if self
.sighup_delay
:
622 self
.log("joining the do_sighup greenlet")
623 self
.sighup_thread
.get()
624 if self
.optrack_toggle_delay
:
625 self
.log("joining the do_optrack_toggle greenlet")
626 self
.optrack_toggle_thread
.join()
627 if self
.dump_ops_enable
== "true":
628 self
.log("joining the do_dump_ops greenlet")
629 self
.dump_ops_thread
.join()
630 if self
.noscrub_toggle_delay
:
631 self
.log("joining the do_noscrub_toggle greenlet")
632 self
.noscrub_toggle_thread
.join()
636 Increase the size of the pool
638 pool
= self
.ceph_manager
.get_pool()
639 orig_pg_num
= self
.ceph_manager
.get_pool_pg_num(pool
)
640 self
.log("Growing pool %s" % (pool
,))
641 if self
.ceph_manager
.expand_pool(pool
,
642 self
.config
.get('pool_grow_by', 10),
644 self
.pools_to_fix_pgp_num
.add(pool
)
646 def fix_pgp_num(self
, pool
=None):
648 Fix number of pgs in pool.
651 pool
= self
.ceph_manager
.get_pool()
655 self
.log("fixing pg num pool %s" % (pool
,))
656 if self
.ceph_manager
.set_pool_pgpnum(pool
, force
):
657 self
.pools_to_fix_pgp_num
.discard(pool
)
659 def test_pool_min_size(self
):
661 Kill and revive all osds except one.
663 self
.log("test_pool_min_size")
665 self
.ceph_manager
.wait_for_recovery(
666 timeout
=self
.config
.get('timeout')
668 the_one
= random
.choice(self
.in_osds
)
669 self
.log("Killing everyone but %s", the_one
)
670 to_kill
= filter(lambda x
: x
!= the_one
, self
.in_osds
)
671 [self
.kill_osd(i
) for i
in to_kill
]
672 [self
.out_osd(i
) for i
in to_kill
]
673 time
.sleep(self
.config
.get("test_pool_min_size_time", 10))
674 self
.log("Killing %s" % (the_one
,))
675 self
.kill_osd(the_one
)
676 self
.out_osd(the_one
)
677 self
.log("Reviving everyone but %s" % (the_one
,))
678 [self
.revive_osd(i
) for i
in to_kill
]
679 [self
.in_osd(i
) for i
in to_kill
]
680 self
.log("Revived everyone but %s" % (the_one
,))
681 self
.log("Waiting for clean")
682 self
.ceph_manager
.wait_for_recovery(
683 timeout
=self
.config
.get('timeout')
686 def inject_pause(self
, conf_key
, duration
, check_after
, should_be_down
):
688 Pause injection testing. Check for osd being down when finished.
690 the_one
= random
.choice(self
.live_osds
)
691 self
.log("inject_pause on {osd}".format(osd
=the_one
))
693 "Testing {key} pause injection for duration {duration}".format(
698 "Checking after {after}, should_be_down={shouldbedown}".format(
700 shouldbedown
=should_be_down
702 self
.ceph_manager
.set_config(the_one
, **{conf_key
: duration
})
703 if not should_be_down
:
705 time
.sleep(check_after
)
706 status
= self
.ceph_manager
.get_osd_status()
707 assert the_one
in status
['down']
708 time
.sleep(duration
- check_after
+ 20)
709 status
= self
.ceph_manager
.get_osd_status()
710 assert not the_one
in status
['down']
712 def test_backfill_full(self
):
714 Test backfills stopping when the replica fills up.
716 First, use injectfull admin command to simulate a now full
717 osd by setting it to 0 on all of the OSDs.
719 Second, on a random subset, set
720 osd_debug_skip_full_check_in_backfill_reservation to force
721 the more complicated check in do_scan to be exercised.
723 Then, verify that all backfills stop.
725 self
.log("injecting backfill full")
726 for i
in self
.live_osds
:
727 self
.ceph_manager
.set_config(
729 osd_debug_skip_full_check_in_backfill_reservation
=
730 random
.choice(['false', 'true']))
731 self
.ceph_manager
.osd_admin_socket(i
, command
=['injectfull', 'backfillfull'],
732 check_status
=True, timeout
=30, stdout
=DEVNULL
)
734 status
= self
.ceph_manager
.compile_pg_status()
735 if 'backfill' not in status
.keys():
738 "waiting for {still_going} backfills".format(
739 still_going
=status
.get('backfill')))
741 assert('backfill' not in self
.ceph_manager
.compile_pg_status().keys())
742 for i
in self
.live_osds
:
743 self
.ceph_manager
.set_config(
745 osd_debug_skip_full_check_in_backfill_reservation
='false')
746 self
.ceph_manager
.osd_admin_socket(i
, command
=['injectfull', 'none'],
747 check_status
=True, timeout
=30, stdout
=DEVNULL
)
749 def test_map_discontinuity(self
):
751 1) Allows the osds to recover
753 3) allows the remaining osds to recover
754 4) waits for some time
756 This sequence should cause the revived osd to have to handle
757 a map gap since the mons would have trimmed
759 while len(self
.in_osds
) < (self
.minin
+ 1):
761 self
.log("Waiting for recovery")
762 self
.ceph_manager
.wait_for_all_up(
763 timeout
=self
.config
.get('timeout')
765 # now we wait 20s for the pg status to change, if it takes longer,
766 # the test *should* fail!
768 self
.ceph_manager
.wait_for_clean(
769 timeout
=self
.config
.get('timeout')
772 # now we wait 20s for the backfill replicas to hear about the clean
774 self
.log("Recovered, killing an osd")
775 self
.kill_osd(mark_down
=True, mark_out
=True)
776 self
.log("Waiting for clean again")
777 self
.ceph_manager
.wait_for_clean(
778 timeout
=self
.config
.get('timeout')
780 self
.log("Waiting for trim")
781 time
.sleep(int(self
.config
.get("map_discontinuity_sleep_time", 40)))
784 def choose_action(self
):
786 Random action selector.
788 chance_down
= self
.config
.get('chance_down', 0.4)
789 chance_test_min_size
= self
.config
.get('chance_test_min_size', 0)
790 chance_test_backfill_full
= \
791 self
.config
.get('chance_test_backfill_full', 0)
792 if isinstance(chance_down
, int):
793 chance_down
= float(chance_down
) / 100
795 minout
= self
.config
.get("min_out", 0)
796 minlive
= self
.config
.get("min_live", 2)
797 mindead
= self
.config
.get("min_dead", 0)
799 self
.log('choose_action: min_in %d min_out '
800 '%d min_live %d min_dead %d' %
801 (minin
, minout
, minlive
, mindead
))
803 if len(self
.in_osds
) > minin
:
804 actions
.append((self
.out_osd
, 1.0,))
805 if len(self
.live_osds
) > minlive
and chance_down
> 0:
806 actions
.append((self
.kill_osd
, chance_down
,))
807 if len(self
.dead_osds
) > 1:
808 actions
.append((self
.rm_past_intervals
, 1.0,))
809 if len(self
.out_osds
) > minout
:
810 actions
.append((self
.in_osd
, 1.7,))
811 if len(self
.dead_osds
) > mindead
:
812 actions
.append((self
.revive_osd
, 1.0,))
813 if self
.config
.get('thrash_primary_affinity', True):
814 actions
.append((self
.primary_affinity
, 1.0,))
815 actions
.append((self
.reweight_osd_or_by_util
,
816 self
.config
.get('reweight_osd', .5),))
817 actions
.append((self
.grow_pool
,
818 self
.config
.get('chance_pgnum_grow', 0),))
819 actions
.append((self
.fix_pgp_num
,
820 self
.config
.get('chance_pgpnum_fix', 0),))
821 actions
.append((self
.test_pool_min_size
,
822 chance_test_min_size
,))
823 actions
.append((self
.test_backfill_full
,
824 chance_test_backfill_full
,))
825 if self
.chance_thrash_cluster_full
> 0:
826 actions
.append((self
.thrash_cluster_full
, self
.chance_thrash_cluster_full
,))
827 if self
.chance_thrash_pg_upmap
> 0:
828 actions
.append((self
.thrash_pg_upmap
, self
.chance_thrash_pg_upmap
,))
829 if self
.chance_thrash_pg_upmap_items
> 0:
830 actions
.append((self
.thrash_pg_upmap_items
, self
.chance_thrash_pg_upmap_items
,))
832 for key
in ['heartbeat_inject_failure', 'filestore_inject_stall']:
835 self
.inject_pause(key
,
836 self
.config
.get('pause_short', 3),
839 self
.config
.get('chance_inject_pause_short', 1),),
841 self
.inject_pause(key
,
842 self
.config
.get('pause_long', 80),
843 self
.config
.get('pause_check_after', 70),
845 self
.config
.get('chance_inject_pause_long', 0),)]:
846 actions
.append(scenario
)
848 total
= sum([y
for (x
, y
) in actions
])
849 val
= random
.uniform(0, total
)
850 for (action
, prob
) in actions
:
862 self
.log(traceback
.format_exc())
869 Loops and sends signal.SIGHUP to a random live osd.
871 Loop delay is controlled by the config value sighup_delay.
873 delay
= float(self
.sighup_delay
)
874 self
.log("starting do_sighup with a delay of {0}".format(delay
))
875 while not self
.stopping
:
876 osd
= random
.choice(self
.live_osds
)
877 self
.ceph_manager
.signal_osd(osd
, signal
.SIGHUP
, silent
=True)
881 def do_optrack_toggle(self
):
883 Loops and toggle op tracking to all osds.
885 Loop delay is controlled by the config value optrack_toggle_delay.
887 delay
= float(self
.optrack_toggle_delay
)
889 self
.log("starting do_optrack_toggle with a delay of {0}".format(delay
))
890 while not self
.stopping
:
891 if osd_state
== "true":
895 self
.ceph_manager
.raw_cluster_cmd_result('tell', 'osd.*',
896 'injectargs', '--osd_enable_op_tracker=%s' % osd_state
)
900 def do_dump_ops(self
):
902 Loops and does op dumps on all osds
904 self
.log("starting do_dump_ops")
905 while not self
.stopping
:
906 for osd
in self
.live_osds
:
907 # Ignore errors because live_osds is in flux
908 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_ops_in_flight'],
909 check_status
=False, timeout
=30, stdout
=DEVNULL
)
910 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_blocked_ops'],
911 check_status
=False, timeout
=30, stdout
=DEVNULL
)
912 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_historic_ops'],
913 check_status
=False, timeout
=30, stdout
=DEVNULL
)
917 def do_noscrub_toggle(self
):
919 Loops and toggle noscrub flags
921 Loop delay is controlled by the config value noscrub_toggle_delay.
923 delay
= float(self
.noscrub_toggle_delay
)
925 self
.log("starting do_noscrub_toggle with a delay of {0}".format(delay
))
926 while not self
.stopping
:
927 if scrub_state
== "none":
928 self
.ceph_manager
.raw_cluster_cmd('osd', 'set', 'noscrub')
929 scrub_state
= "noscrub"
930 elif scrub_state
== "noscrub":
931 self
.ceph_manager
.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
933 elif scrub_state
== "both":
934 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'noscrub')
935 scrub_state
= "nodeep-scrub"
937 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
940 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'noscrub')
941 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
946 Loop to select random actions to thrash ceph manager with.
948 cleanint
= self
.config
.get("clean_interval", 60)
949 scrubint
= self
.config
.get("scrub_interval", -1)
950 maxdead
= self
.config
.get("max_dead", 0)
951 delay
= self
.config
.get("op_delay", 5)
952 self
.log("starting do_thrash")
953 while not self
.stopping
:
954 to_log
= [str(x
) for x
in ["in_osds: ", self
.in_osds
,
955 "out_osds: ", self
.out_osds
,
956 "dead_osds: ", self
.dead_osds
,
957 "live_osds: ", self
.live_osds
]]
958 self
.log(" ".join(to_log
))
959 if random
.uniform(0, 1) < (float(delay
) / cleanint
):
960 while len(self
.dead_osds
) > maxdead
:
962 for osd
in self
.in_osds
:
963 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
965 if random
.uniform(0, 1) < float(
966 self
.config
.get('chance_test_map_discontinuity', 0)):
967 self
.test_map_discontinuity()
969 self
.ceph_manager
.wait_for_recovery(
970 timeout
=self
.config
.get('timeout')
972 time
.sleep(self
.clean_wait
)
974 if random
.uniform(0, 1) < (float(delay
) / scrubint
):
975 self
.log('Scrubbing while thrashing being performed')
976 Scrubber(self
.ceph_manager
, self
.config
)
977 self
.choose_action()()
979 for pool
in list(self
.pools_to_fix_pgp_num
):
980 if self
.ceph_manager
.get_pool_pg_num(pool
) > 0:
981 self
.fix_pgp_num(pool
)
982 self
.pools_to_fix_pgp_num
.clear()
983 for service
, opt
, saved_value
in self
.saved_options
:
984 self
._set
_config
(service
, '*', opt
, saved_value
)
985 self
.saved_options
= []
989 class ObjectStoreTool
:
991 def __init__(self
, manager
, pool
, **kwargs
):
992 self
.manager
= manager
994 self
.osd
= kwargs
.get('osd', None)
995 self
.object_name
= kwargs
.get('object_name', None)
996 self
.do_revive
= kwargs
.get('do_revive', True)
997 if self
.osd
and self
.pool
and self
.object_name
:
998 if self
.osd
== "primary":
999 self
.osd
= self
.manager
.get_object_primary(self
.pool
,
1002 if self
.object_name
:
1003 self
.pgid
= self
.manager
.get_object_pg_with_shard(self
.pool
,
1006 self
.remote
= self
.manager
.ctx
.\
1007 cluster
.only('osd.{o}'.format(o
=self
.osd
)).remotes
.keys()[0]
1008 path
= self
.manager
.get_filepath().format(id=self
.osd
)
1009 self
.paths
= ("--data-path {path} --journal-path {path}/journal".
1012 def build_cmd(self
, options
, args
, stdin
):
1014 if self
.object_name
:
1015 lines
.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1016 "{paths} --pgid {pgid} --op list |"
1017 "grep '\"oid\":\"{name}\"')".
1018 format(paths
=self
.paths
,
1020 name
=self
.object_name
))
1021 args
= '"$object" ' + args
1022 options
+= " --pgid {pgid}".format(pgid
=self
.pgid
)
1023 cmd
= ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1024 format(paths
=self
.paths
,
1028 cmd
= ("echo {payload} | base64 --decode | {cmd}".
1029 format(payload
=base64
.encode(stdin
),
1032 return "\n".join(lines
)
1034 def run(self
, options
, args
, stdin
=None, stdout
=None):
1037 self
.manager
.kill_osd(self
.osd
)
1038 cmd
= self
.build_cmd(options
, args
, stdin
)
1039 self
.manager
.log(cmd
)
1041 proc
= self
.remote
.run(args
=['bash', '-e', '-x', '-c', cmd
],
1046 if proc
.exitstatus
!= 0:
1047 self
.manager
.log("failed with " + str(proc
.exitstatus
))
1048 error
= proc
.stdout
.getvalue() + " " + proc
.stderr
.getvalue()
1049 raise Exception(error
)
1052 self
.manager
.revive_osd(self
.osd
)
1057 Ceph manager object.
1058 Contains several local functions that form a bulk of this module.
1060 Note: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1065 ERASURE_CODED_POOL
= 3
1067 def __init__(self
, controller
, ctx
=None, config
=None, logger
=None,
1069 self
.lock
= threading
.RLock()
1071 self
.config
= config
1072 self
.controller
= controller
1073 self
.next_pool_id
= 0
1074 self
.cluster
= cluster
1076 self
.log
= lambda x
: logger
.info(x
)
1080 implement log behavior.
1084 if self
.config
is None:
1085 self
.config
= dict()
1086 pools
= self
.list_pools()
1089 # we may race with a pool deletion; ignore failures here
1091 self
.pools
[pool
] = self
.get_pool_property(pool
, 'pg_num')
1092 except CommandFailedError
:
1093 self
.log('Failed to get pg_num from pool %s, ignoring' % pool
)
1095 def raw_cluster_cmd(self
, *args
):
1097 Start ceph on a raw cluster. Return count
1099 testdir
= teuthology
.get_testdir(self
.ctx
)
1104 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1111 ceph_args
.extend(args
)
1112 proc
= self
.controller
.run(
1116 return proc
.stdout
.getvalue()
1118 def raw_cluster_cmd_result(self
, *args
):
1120 Start ceph on a cluster. Return success or failure information.
1122 testdir
= teuthology
.get_testdir(self
.ctx
)
1127 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1134 ceph_args
.extend(args
)
1135 proc
= self
.controller
.run(
1139 return proc
.exitstatus
1141 def run_ceph_w(self
):
1143 Execute "ceph -w" in the background with stdout connected to a StringIO,
1144 and return the RemoteProcess.
1146 return self
.controller
.run(
1154 wait
=False, stdout
=StringIO(), stdin
=run
.PIPE
)
1156 def do_rados(self
, remote
, cmd
, check_status
=True):
1158 Execute a remote rados command.
1160 testdir
= teuthology
.get_testdir(self
.ctx
)
1164 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1173 check_status
=check_status
1177 def rados_write_objects(self
, pool
, num_objects
, size
,
1178 timelimit
, threads
, cleanup
=False):
1181 Threads not used yet.
1185 '--num-objects', num_objects
,
1191 args
.append('--no-cleanup')
1192 return self
.do_rados(self
.controller
, map(str, args
))
1194 def do_put(self
, pool
, obj
, fname
, namespace
=None):
1196 Implement rados put operation
1199 if namespace
is not None:
1200 args
+= ['-N', namespace
]
1206 return self
.do_rados(
1212 def do_get(self
, pool
, obj
, fname
='/dev/null', namespace
=None):
1214 Implement rados get operation
1217 if namespace
is not None:
1218 args
+= ['-N', namespace
]
1224 return self
.do_rados(
1230 def do_rm(self
, pool
, obj
, namespace
=None):
1232 Implement rados rm operation
1235 if namespace
is not None:
1236 args
+= ['-N', namespace
]
1241 return self
.do_rados(
1247 def osd_admin_socket(self
, osd_id
, command
, check_status
=True, timeout
=0, stdout
=None):
1250 return self
.admin_socket('osd', osd_id
, command
, check_status
, timeout
, stdout
)
1252 def find_remote(self
, service_type
, service_id
):
1254 Get the Remote for the host where a particular service runs.
1256 :param service_type: 'mds', 'osd', 'client'
1257 :param service_id: The second part of a role, e.g. '0' for
1259 :return: a Remote instance for the host where the
1260 requested role is placed
1262 return get_remote(self
.ctx
, self
.cluster
,
1263 service_type
, service_id
)
1265 def admin_socket(self
, service_type
, service_id
,
1266 command
, check_status
=True, timeout
=0, stdout
=None):
1268 Remotely start up ceph specifying the admin socket
1269 :param command: a list of words to use as the command
1274 testdir
= teuthology
.get_testdir(self
.ctx
)
1275 remote
= self
.find_remote(service_type
, service_id
)
1280 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1287 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1288 cluster
=self
.cluster
,
1292 args
.extend(command
)
1297 check_status
=check_status
1300 def objectstore_tool(self
, pool
, options
, args
, **kwargs
):
1301 return ObjectStoreTool(self
, pool
, **kwargs
).run(options
, args
)
1303 def get_pgid(self
, pool
, pgnum
):
1305 :param pool: pool name
1306 :param pgnum: pg number
1307 :returns: a string representing this pg.
1309 poolnum
= self
.get_pool_num(pool
)
1310 pg_str
= "{poolnum}.{pgnum}".format(
1315 def get_pg_replica(self
, pool
, pgnum
):
1317 get replica for pool, pgnum (e.g. (data, 0)->0
1319 pg_str
= self
.get_pgid(pool
, pgnum
)
1320 output
= self
.raw_cluster_cmd("pg", "map", pg_str
, '--format=json')
1321 j
= json
.loads('\n'.join(output
.split('\n')[1:]))
1322 return int(j
['acting'][-1])
1325 def wait_for_pg_stats(func
):
1326 # both osd_mon_report_interval_min and mgr_stats_period are 5 seconds
1327 # by default, and take the faulty injection in ms into consideration,
1328 # 12 seconds are more than enough
1329 delays
= [1, 1, 2, 3, 5, 8, 13]
1331 def wrapper(self
, *args
, **kwargs
):
1333 for delay
in delays
:
1335 return func(self
, *args
, **kwargs
)
1336 except AssertionError as e
:
1342 def get_pg_primary(self
, pool
, pgnum
):
1344 get primary for pool, pgnum (e.g. (data, 0)->0
1346 pg_str
= self
.get_pgid(pool
, pgnum
)
1347 output
= self
.raw_cluster_cmd("pg", "map", pg_str
, '--format=json')
1348 j
= json
.loads('\n'.join(output
.split('\n')[1:]))
1349 return int(j
['acting'][0])
1352 def get_pool_num(self
, pool
):
1354 get number for pool (e.g., data -> 2)
1356 return int(self
.get_pool_dump(pool
)['pool'])
1358 def list_pools(self
):
1362 osd_dump
= self
.get_osd_dump_json()
1363 self
.log(osd_dump
['pools'])
1364 return [str(i
['pool_name']) for i
in osd_dump
['pools']]
1366 def clear_pools(self
):
1370 [self
.remove_pool(i
) for i
in self
.list_pools()]
1372 def kick_recovery_wq(self
, osdnum
):
1374 Run kick_recovery_wq on cluster.
1376 return self
.raw_cluster_cmd(
1377 'tell', "osd.%d" % (int(osdnum
),),
1382 def wait_run_admin_socket(self
, service_type
,
1383 service_id
, args
=['version'], timeout
=75, stdout
=None):
1385 If osd_admin_socket call suceeds, return. Otherwise wait
1386 five seconds and try again.
1392 proc
= self
.admin_socket(service_type
, service_id
,
1393 args
, check_status
=False, stdout
=stdout
)
1394 if proc
.exitstatus
is 0:
1398 if (tries
* 5) > timeout
:
1399 raise Exception('timed out waiting for admin_socket '
1400 'to appear after {type}.{id} restart'.
1401 format(type=service_type
,
1403 self
.log("waiting on admin_socket for {type}-{id}, "
1404 "{command}".format(type=service_type
,
1409 def get_pool_dump(self
, pool
):
1411 get the osd dump part of a pool
1413 osd_dump
= self
.get_osd_dump_json()
1414 for i
in osd_dump
['pools']:
1415 if i
['pool_name'] == pool
:
1419 def get_config(self
, service_type
, service_id
, name
):
1421 :param node: like 'mon.a'
1422 :param name: the option name
1424 proc
= self
.wait_run_admin_socket(service_type
, service_id
,
1426 j
= json
.loads(proc
.stdout
.getvalue())
1429 def set_config(self
, osdnum
, **argdict
):
1431 :param osdnum: osd number
1432 :param argdict: dictionary containing values to set.
1434 for k
, v
in argdict
.iteritems():
1435 self
.wait_run_admin_socket(
1437 ['config', 'set', str(k
), str(v
)])
1439 def raw_cluster_status(self
):
1441 Get status from cluster
1443 status
= self
.raw_cluster_cmd('status', '--format=json-pretty')
1444 return json
.loads(status
)
1446 def raw_osd_status(self
):
1448 Get osd status from cluster
1450 return self
.raw_cluster_cmd('osd', 'dump')
1452 def get_osd_status(self
):
1454 Get osd statuses sorted by states that the osds are in.
1457 lambda x
: x
.startswith('osd.') and (("up" in x
) or ("down" in x
)),
1458 self
.raw_osd_status().split('\n'))
1460 in_osds
= [int(i
[4:].split()[0])
1461 for i
in filter(lambda x
: " in " in x
, osd_lines
)]
1462 out_osds
= [int(i
[4:].split()[0])
1463 for i
in filter(lambda x
: " out " in x
, osd_lines
)]
1464 up_osds
= [int(i
[4:].split()[0])
1465 for i
in filter(lambda x
: " up " in x
, osd_lines
)]
1466 down_osds
= [int(i
[4:].split()[0])
1467 for i
in filter(lambda x
: " down " in x
, osd_lines
)]
1468 dead_osds
= [int(x
.id_
)
1469 for x
in filter(lambda x
:
1472 iter_daemons_of_role('osd', self
.cluster
))]
1473 live_osds
= [int(x
.id_
) for x
in
1476 self
.ctx
.daemons
.iter_daemons_of_role('osd',
1478 return {'in': in_osds
, 'out': out_osds
, 'up': up_osds
,
1479 'down': down_osds
, 'dead': dead_osds
, 'live': live_osds
,
1482 def get_num_pgs(self
):
1484 Check cluster status for the number of pgs
1486 status
= self
.raw_cluster_status()
1488 return status
['pgmap']['num_pgs']
1490 def create_erasure_code_profile(self
, profile_name
, profile
):
1492 Create an erasure code profile name that can be used as a parameter
1493 when creating an erasure coded pool.
1496 args
= cmd_erasure_code_profile(profile_name
, profile
)
1497 self
.raw_cluster_cmd(*args
)
1499 def create_pool_with_unique_name(self
, pg_num
=16,
1500 erasure_code_profile_name
=None,
1502 erasure_code_use_overwrites
=False):
1504 Create a pool named unique_pool_X where X is unique.
1508 name
= "unique_pool_%s" % (str(self
.next_pool_id
),)
1509 self
.next_pool_id
+= 1
1513 erasure_code_profile_name
=erasure_code_profile_name
,
1515 erasure_code_use_overwrites
=erasure_code_use_overwrites
)
1518 @contextlib.contextmanager
1519 def pool(self
, pool_name
, pg_num
=16, erasure_code_profile_name
=None):
1520 self
.create_pool(pool_name
, pg_num
, erasure_code_profile_name
)
1522 self
.remove_pool(pool_name
)
1524 def create_pool(self
, pool_name
, pg_num
=16,
1525 erasure_code_profile_name
=None,
1527 erasure_code_use_overwrites
=False):
1529 Create a pool named from the pool_name parameter.
1530 :param pool_name: name of the pool being created.
1531 :param pg_num: initial number of pgs.
1532 :param erasure_code_profile_name: if set and !None create an
1533 erasure coded pool using the profile
1534 :param erasure_code_use_overwrites: if true, allow overwrites
1537 assert isinstance(pool_name
, basestring
)
1538 assert isinstance(pg_num
, int)
1539 assert pool_name
not in self
.pools
1540 self
.log("creating pool_name %s" % (pool_name
,))
1541 if erasure_code_profile_name
:
1542 self
.raw_cluster_cmd('osd', 'pool', 'create',
1543 pool_name
, str(pg_num
), str(pg_num
),
1544 'erasure', erasure_code_profile_name
)
1546 self
.raw_cluster_cmd('osd', 'pool', 'create',
1547 pool_name
, str(pg_num
))
1548 if min_size
is not None:
1549 self
.raw_cluster_cmd(
1550 'osd', 'pool', 'set', pool_name
,
1553 if erasure_code_use_overwrites
:
1554 self
.raw_cluster_cmd(
1555 'osd', 'pool', 'set', pool_name
,
1556 'allow_ec_overwrites',
1558 self
.pools
[pool_name
] = pg_num
1561 def add_pool_snap(self
, pool_name
, snap_name
):
1564 :param pool_name: name of pool to snapshot
1565 :param snap_name: name of snapshot to take
1567 self
.raw_cluster_cmd('osd', 'pool', 'mksnap',
1568 str(pool_name
), str(snap_name
))
1570 def remove_pool_snap(self
, pool_name
, snap_name
):
1572 Remove pool snapshot
1573 :param pool_name: name of pool to snapshot
1574 :param snap_name: name of snapshot to remove
1576 self
.raw_cluster_cmd('osd', 'pool', 'rmsnap',
1577 str(pool_name
), str(snap_name
))
1579 def remove_pool(self
, pool_name
):
1581 Remove the indicated pool
1582 :param pool_name: Pool to be removed
1585 assert isinstance(pool_name
, basestring
)
1586 assert pool_name
in self
.pools
1587 self
.log("removing pool_name %s" % (pool_name
,))
1588 del self
.pools
[pool_name
]
1589 self
.do_rados(self
.controller
,
1590 ['rmpool', pool_name
, pool_name
,
1591 "--yes-i-really-really-mean-it"])
1598 return random
.choice(self
.pools
.keys())
1600 def get_pool_pg_num(self
, pool_name
):
1602 Return the number of pgs in the pool specified.
1605 assert isinstance(pool_name
, basestring
)
1606 if pool_name
in self
.pools
:
1607 return self
.pools
[pool_name
]
1610 def get_pool_property(self
, pool_name
, prop
):
1612 :param pool_name: pool
1613 :param prop: property to be checked.
1614 :returns: property as an int value.
1617 assert isinstance(pool_name
, basestring
)
1618 assert isinstance(prop
, basestring
)
1619 output
= self
.raw_cluster_cmd(
1625 return int(output
.split()[1])
1627 def set_pool_property(self
, pool_name
, prop
, val
):
1629 :param pool_name: pool
1630 :param prop: property to be set.
1631 :param val: value to set.
1633 This routine retries if set operation fails.
1636 assert isinstance(pool_name
, basestring
)
1637 assert isinstance(prop
, basestring
)
1638 assert isinstance(val
, int)
1641 r
= self
.raw_cluster_cmd_result(
1648 if r
!= 11: # EAGAIN
1652 raise Exception('timed out getting EAGAIN '
1653 'when setting pool property %s %s = %s' %
1654 (pool_name
, prop
, val
))
1655 self
.log('got EAGAIN setting pool property, '
1656 'waiting a few seconds...')
1659 def expand_pool(self
, pool_name
, by
, max_pgs
):
1661 Increase the number of pgs in a pool
1664 assert isinstance(pool_name
, basestring
)
1665 assert isinstance(by
, int)
1666 assert pool_name
in self
.pools
1667 if self
.get_num_creating() > 0:
1669 if (self
.pools
[pool_name
] + by
) > max_pgs
:
1671 self
.log("increase pool size by %d" % (by
,))
1672 new_pg_num
= self
.pools
[pool_name
] + by
1673 self
.set_pool_property(pool_name
, "pg_num", new_pg_num
)
1674 self
.pools
[pool_name
] = new_pg_num
1677 def set_pool_pgpnum(self
, pool_name
, force
):
1679 Set pgpnum property of pool_name pool.
1682 assert isinstance(pool_name
, basestring
)
1683 assert pool_name
in self
.pools
1684 if not force
and self
.get_num_creating() > 0:
1686 self
.set_pool_property(pool_name
, 'pgp_num', self
.pools
[pool_name
])
1689 def list_pg_missing(self
, pgid
):
1691 return list of missing pgs with the id specified
1696 out
= self
.raw_cluster_cmd('--', 'pg', pgid
, 'list_missing',
1702 r
['objects'].extend(j
['objects'])
1707 offset
= j
['objects'][-1]['oid']
1712 def get_pg_stats(self
):
1714 Dump the cluster and get pg stats
1716 out
= self
.raw_cluster_cmd('pg', 'dump', '--format=json')
1717 j
= json
.loads('\n'.join(out
.split('\n')[1:]))
1718 return j
['pg_stats']
1720 def compile_pg_status(self
):
1722 Return a histogram of pg state values
1725 j
= self
.get_pg_stats()
1727 for status
in pg
['state'].split('+'):
1728 if status
not in ret
:
1734 def with_pg_state(self
, pool
, pgnum
, check
):
1735 pgstr
= self
.get_pgid(pool
, pgnum
)
1736 stats
= self
.get_single_pg_stats(pgstr
)
1737 assert(check(stats
['state']))
1740 def with_pg(self
, pool
, pgnum
, check
):
1741 pgstr
= self
.get_pgid(pool
, pgnum
)
1742 stats
= self
.get_single_pg_stats(pgstr
)
1745 def get_last_scrub_stamp(self
, pool
, pgnum
):
1747 Get the timestamp of the last scrub.
1749 stats
= self
.get_single_pg_stats(self
.get_pgid(pool
, pgnum
))
1750 return stats
["last_scrub_stamp"]
1752 def do_pg_scrub(self
, pool
, pgnum
, stype
):
1754 Scrub pg and wait for scrubbing to finish
1756 init
= self
.get_last_scrub_stamp(pool
, pgnum
)
1757 RESEND_TIMEOUT
= 120 # Must be a multiple of SLEEP_TIME
1758 FATAL_TIMEOUT
= RESEND_TIMEOUT
* 3
1761 while init
== self
.get_last_scrub_stamp(pool
, pgnum
):
1762 assert timer
< FATAL_TIMEOUT
, "fatal timeout trying to " + stype
1763 self
.log("waiting for scrub type %s" % (stype
,))
1764 if (timer
% RESEND_TIMEOUT
) == 0:
1765 self
.raw_cluster_cmd('pg', stype
, self
.get_pgid(pool
, pgnum
))
1766 # The first time in this loop is the actual request
1767 if timer
!= 0 and stype
== "repair":
1768 self
.log("WARNING: Resubmitted a non-idempotent repair")
1769 time
.sleep(SLEEP_TIME
)
1772 def wait_snap_trimming_complete(self
, pool
):
1774 Wait for snap trimming on pool to end
1779 poolnum
= self
.get_pool_num(pool
)
1780 poolnumstr
= "%s." % (poolnum
,)
1783 if (now
- start
) > FATAL_TIMEOUT
:
1784 assert (now
- start
) < FATAL_TIMEOUT
, \
1785 'failed to complete snap trimming before timeout'
1786 all_stats
= self
.get_pg_stats()
1788 for pg
in all_stats
:
1789 if (poolnumstr
in pg
['pgid']) and ('snaptrim' in pg
['state']):
1790 self
.log("pg {pg} in trimming, state: {state}".format(
1796 self
.log("{pool} still trimming, waiting".format(pool
=pool
))
1797 time
.sleep(POLL_PERIOD
)
1799 def get_single_pg_stats(self
, pgid
):
1801 Return pg for the pgid specified.
1803 all_stats
= self
.get_pg_stats()
1805 for pg
in all_stats
:
1806 if pg
['pgid'] == pgid
:
1811 def get_object_pg_with_shard(self
, pool
, name
, osdid
):
1814 pool_dump
= self
.get_pool_dump(pool
)
1815 object_map
= self
.get_object_map(pool
, name
)
1816 if pool_dump
["type"] == CephManager
.ERASURE_CODED_POOL
:
1817 shard
= object_map
['acting'].index(osdid
)
1818 return "{pgid}s{shard}".format(pgid
=object_map
['pgid'],
1821 return object_map
['pgid']
1823 def get_object_primary(self
, pool
, name
):
1826 object_map
= self
.get_object_map(pool
, name
)
1827 return object_map
['acting_primary']
1829 def get_object_map(self
, pool
, name
):
1831 osd map --format=json converted to a python object
1832 :returns: the python object
1834 out
= self
.raw_cluster_cmd('--format=json', 'osd', 'map', pool
, name
)
1835 return json
.loads('\n'.join(out
.split('\n')[1:]))
1837 def get_osd_dump_json(self
):
1839 osd dump --format=json converted to a python object
1840 :returns: the python object
1842 out
= self
.raw_cluster_cmd('osd', 'dump', '--format=json')
1843 return json
.loads('\n'.join(out
.split('\n')[1:]))
1845 def get_osd_dump(self
):
1850 return self
.get_osd_dump_json()['osds']
1852 def get_stuck_pgs(self
, type_
, threshold
):
1854 :returns: stuck pg information from the cluster
1856 out
= self
.raw_cluster_cmd('pg', 'dump_stuck', type_
, str(threshold
),
1858 return json
.loads(out
)
1860 def get_num_unfound_objects(self
):
1862 Check cluster status to get the number of unfound objects
1864 status
= self
.raw_cluster_status()
1866 return status
['pgmap'].get('unfound_objects', 0)
1868 def get_num_creating(self
):
1870 Find the number of pgs in creating mode.
1872 pgs
= self
.get_pg_stats()
1875 if 'creating' in pg
['state']:
1879 def get_num_active_clean(self
):
1881 Find the number of active and clean pgs.
1883 pgs
= self
.get_pg_stats()
1886 if (pg
['state'].count('active') and
1887 pg
['state'].count('clean') and
1888 not pg
['state'].count('stale')):
1892 def get_num_active_recovered(self
):
1894 Find the number of active and recovered pgs.
1896 pgs
= self
.get_pg_stats()
1899 if (pg
['state'].count('active') and
1900 not pg
['state'].count('recover') and
1901 not pg
['state'].count('backfill') and
1902 not pg
['state'].count('stale')):
1906 def get_is_making_recovery_progress(self
):
1908 Return whether there is recovery progress discernable in the
1911 status
= self
.raw_cluster_status()
1912 kps
= status
['pgmap'].get('recovering_keys_per_sec', 0)
1913 bps
= status
['pgmap'].get('recovering_bytes_per_sec', 0)
1914 ops
= status
['pgmap'].get('recovering_objects_per_sec', 0)
1915 return kps
> 0 or bps
> 0 or ops
> 0
1917 def get_num_active(self
):
1919 Find the number of active pgs.
1921 pgs
= self
.get_pg_stats()
1924 if pg
['state'].count('active') and not pg
['state'].count('stale'):
1928 def get_num_down(self
):
1930 Find the number of pgs that are down.
1932 pgs
= self
.get_pg_stats()
1935 if ((pg
['state'].count('down') and not
1936 pg
['state'].count('stale')) or
1937 (pg
['state'].count('incomplete') and not
1938 pg
['state'].count('stale'))):
1942 def get_num_active_down(self
):
1944 Find the number of pgs that are either active or down.
1946 pgs
= self
.get_pg_stats()
1949 if ((pg
['state'].count('active') and not
1950 pg
['state'].count('stale')) or
1951 (pg
['state'].count('down') and not
1952 pg
['state'].count('stale')) or
1953 (pg
['state'].count('incomplete') and not
1954 pg
['state'].count('stale'))):
1960 True if all pgs are clean
1962 return self
.get_num_active_clean() == self
.get_num_pgs()
1964 def is_recovered(self
):
1966 True if all pgs have recovered
1968 return self
.get_num_active_recovered() == self
.get_num_pgs()
1970 def is_active_or_down(self
):
1972 True if all pgs are active or down
1974 return self
.get_num_active_down() == self
.get_num_pgs()
1976 def wait_for_clean(self
, timeout
=None):
1978 Returns true when all pgs are clean.
1980 self
.log("waiting for clean")
1982 num_active_clean
= self
.get_num_active_clean()
1983 while not self
.is_clean():
1984 if timeout
is not None:
1985 if self
.get_is_making_recovery_progress():
1986 self
.log("making progress, resetting timeout")
1989 self
.log("no progress seen, keeping timeout for now")
1990 if time
.time() - start
>= timeout
:
1991 self
.log('dumping pgs')
1992 out
= self
.raw_cluster_cmd('pg', 'dump')
1994 assert time
.time() - start
< timeout
, \
1995 'failed to become clean before timeout expired'
1996 cur_active_clean
= self
.get_num_active_clean()
1997 if cur_active_clean
!= num_active_clean
:
1999 num_active_clean
= cur_active_clean
2003 def are_all_osds_up(self
):
2005 Returns true if all osds are up.
2007 x
= self
.get_osd_dump()
2008 return (len(x
) == sum([(y
['up'] > 0) for y
in x
]))
2010 def wait_for_all_up(self
, timeout
=None):
2012 When this exits, either the timeout has expired, or all
2015 self
.log("waiting for all up")
2017 while not self
.are_all_osds_up():
2018 if timeout
is not None:
2019 assert time
.time() - start
< timeout
, \
2020 'timeout expired in wait_for_all_up'
2024 def wait_for_recovery(self
, timeout
=None):
2026 Check peering. When this exists, we have recovered.
2028 self
.log("waiting for recovery to complete")
2030 num_active_recovered
= self
.get_num_active_recovered()
2031 while not self
.is_recovered():
2033 if timeout
is not None:
2034 if self
.get_is_making_recovery_progress():
2035 self
.log("making progress, resetting timeout")
2038 self
.log("no progress seen, keeping timeout for now")
2039 if now
- start
>= timeout
:
2040 self
.log('dumping pgs')
2041 out
= self
.raw_cluster_cmd('pg', 'dump')
2043 assert now
- start
< timeout
, \
2044 'failed to recover before timeout expired'
2045 cur_active_recovered
= self
.get_num_active_recovered()
2046 if cur_active_recovered
!= num_active_recovered
:
2048 num_active_recovered
= cur_active_recovered
2050 self
.log("recovered!")
2052 def wait_for_active(self
, timeout
=None):
2054 Check peering. When this exists, we are definitely active
2056 self
.log("waiting for peering to complete")
2058 num_active
= self
.get_num_active()
2059 while not self
.is_active():
2060 if timeout
is not None:
2061 if time
.time() - start
>= timeout
:
2062 self
.log('dumping pgs')
2063 out
= self
.raw_cluster_cmd('pg', 'dump')
2065 assert time
.time() - start
< timeout
, \
2066 'failed to recover before timeout expired'
2067 cur_active
= self
.get_num_active()
2068 if cur_active
!= num_active
:
2070 num_active
= cur_active
2074 def wait_for_active_or_down(self
, timeout
=None):
2076 Check peering. When this exists, we are definitely either
2079 self
.log("waiting for peering to complete or become blocked")
2081 num_active_down
= self
.get_num_active_down()
2082 while not self
.is_active_or_down():
2083 if timeout
is not None:
2084 if time
.time() - start
>= timeout
:
2085 self
.log('dumping pgs')
2086 out
= self
.raw_cluster_cmd('pg', 'dump')
2088 assert time
.time() - start
< timeout
, \
2089 'failed to recover before timeout expired'
2090 cur_active_down
= self
.get_num_active_down()
2091 if cur_active_down
!= num_active_down
:
2093 num_active_down
= cur_active_down
2095 self
.log("active or down!")
2097 def osd_is_up(self
, osd
):
2099 Wrapper for osd check
2101 osds
= self
.get_osd_dump()
2102 return osds
[osd
]['up'] > 0
2104 def wait_till_osd_is_up(self
, osd
, timeout
=None):
2106 Loop waiting for osd.
2108 self
.log('waiting for osd.%d to be up' % osd
)
2110 while not self
.osd_is_up(osd
):
2111 if timeout
is not None:
2112 assert time
.time() - start
< timeout
, \
2113 'osd.%d failed to come up before timeout expired' % osd
2115 self
.log('osd.%d is up' % osd
)
2117 def is_active(self
):
2119 Wrapper to check if all pgs are active
2121 return self
.get_num_active() == self
.get_num_pgs()
2123 def wait_till_active(self
, timeout
=None):
2125 Wait until all pgs are active.
2127 self
.log("waiting till active")
2129 while not self
.is_active():
2130 if timeout
is not None:
2131 if time
.time() - start
>= timeout
:
2132 self
.log('dumping pgs')
2133 out
= self
.raw_cluster_cmd('pg', 'dump')
2135 assert time
.time() - start
< timeout
, \
2136 'failed to become active before timeout expired'
2140 def mark_out_osd(self
, osd
):
2142 Wrapper to mark osd out.
2144 self
.raw_cluster_cmd('osd', 'out', str(osd
))
2146 def kill_osd(self
, osd
):
2148 Kill osds by either power cycling (if indicated by the config)
2151 if self
.config
.get('powercycle'):
2152 remote
= self
.find_remote('osd', osd
)
2153 self
.log('kill_osd on osd.{o} '
2154 'doing powercycle of {s}'.format(o
=osd
, s
=remote
.name
))
2155 self
._assert
_ipmi
(remote
)
2156 remote
.console
.power_off()
2157 elif self
.config
.get('bdev_inject_crash') and self
.config
.get('bdev_inject_crash_probability'):
2158 if random
.uniform(0, 1) < self
.config
.get('bdev_inject_crash_probability', .5):
2159 self
.raw_cluster_cmd(
2160 '--', 'tell', 'osd.%d' % osd
,
2162 '--bdev-inject-crash %d' % self
.config
.get('bdev_inject_crash'),
2165 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).wait()
2169 raise RuntimeError('osd.%s did not fail' % osd
)
2171 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2173 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2176 def _assert_ipmi(remote
):
2177 assert remote
.console
.has_ipmi_credentials
, (
2178 "powercycling requested but RemoteConsole is not "
2179 "initialized. Check ipmi config.")
2181 def blackhole_kill_osd(self
, osd
):
2183 Stop osd if nothing else works.
2185 self
.raw_cluster_cmd('--', 'tell', 'osd.%d' % osd
,
2187 '--objectstore-blackhole')
2189 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2191 def revive_osd(self
, osd
, timeout
=150, skip_admin_check
=False):
2193 Revive osds by either power cycling (if indicated by the config)
2196 if self
.config
.get('powercycle'):
2197 remote
= self
.find_remote('osd', osd
)
2198 self
.log('kill_osd on osd.{o} doing powercycle of {s}'.
2199 format(o
=osd
, s
=remote
.name
))
2200 self
._assert
_ipmi
(remote
)
2201 remote
.console
.power_on()
2202 if not remote
.console
.check_status(300):
2203 raise Exception('Failed to revive osd.{o} via ipmi'.
2205 teuthology
.reconnect(self
.ctx
, 60, [remote
])
2206 mount_osd_data(self
.ctx
, remote
, self
.cluster
, str(osd
))
2207 self
.make_admin_daemon_dir(remote
)
2208 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).reset()
2209 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).restart()
2211 if not skip_admin_check
:
2212 # wait for dump_ops_in_flight; this command doesn't appear
2213 # until after the signal handler is installed and it is safe
2214 # to stop the osd again without making valgrind leak checks
2215 # unhappy. see #5924.
2216 self
.wait_run_admin_socket('osd', osd
,
2217 args
=['dump_ops_in_flight'],
2218 timeout
=timeout
, stdout
=DEVNULL
)
2220 def mark_down_osd(self
, osd
):
2222 Cluster command wrapper
2224 self
.raw_cluster_cmd('osd', 'down', str(osd
))
2226 def mark_in_osd(self
, osd
):
2228 Cluster command wrapper
2230 self
.raw_cluster_cmd('osd', 'in', str(osd
))
2232 def signal_osd(self
, osd
, sig
, silent
=False):
2234 Wrapper to local get_daemon call which sends the given
2235 signal to the given osd.
2237 self
.ctx
.daemons
.get_daemon('osd', osd
,
2238 self
.cluster
).signal(sig
, silent
=silent
)
2241 def signal_mon(self
, mon
, sig
, silent
=False):
2243 Wrapper to local get_deamon call
2245 self
.ctx
.daemons
.get_daemon('mon', mon
,
2246 self
.cluster
).signal(sig
, silent
=silent
)
2248 def kill_mon(self
, mon
):
2250 Kill the monitor by either power cycling (if the config says so),
2253 if self
.config
.get('powercycle'):
2254 remote
= self
.find_remote('mon', mon
)
2255 self
.log('kill_mon on mon.{m} doing powercycle of {s}'.
2256 format(m
=mon
, s
=remote
.name
))
2257 self
._assert
_ipmi
(remote
)
2258 remote
.console
.power_off()
2260 self
.ctx
.daemons
.get_daemon('mon', mon
, self
.cluster
).stop()
2262 def revive_mon(self
, mon
):
2264 Restart by either power cycling (if the config says so),
2265 or by doing a normal restart.
2267 if self
.config
.get('powercycle'):
2268 remote
= self
.find_remote('mon', mon
)
2269 self
.log('revive_mon on mon.{m} doing powercycle of {s}'.
2270 format(m
=mon
, s
=remote
.name
))
2271 self
._assert
_ipmi
(remote
)
2272 remote
.console
.power_on()
2273 self
.make_admin_daemon_dir(remote
)
2274 self
.ctx
.daemons
.get_daemon('mon', mon
, self
.cluster
).restart()
2276 def get_mon_status(self
, mon
):
2278 Extract all the monitor status information from the cluster
2280 addr
= self
.ctx
.ceph
[self
.cluster
].conf
['mon.%s' % mon
]['mon addr']
2281 out
= self
.raw_cluster_cmd('-m', addr
, 'mon_status')
2282 return json
.loads(out
)
2284 def get_mon_quorum(self
):
2286 Extract monitor quorum information from the cluster
2288 out
= self
.raw_cluster_cmd('quorum_status')
2290 self
.log('quorum_status is %s' % out
)
2293 def wait_for_mon_quorum_size(self
, size
, timeout
=300):
2295 Loop until quorum size is reached.
2297 self
.log('waiting for quorum size %d' % size
)
2299 while not len(self
.get_mon_quorum()) == size
:
2300 if timeout
is not None:
2301 assert time
.time() - start
< timeout
, \
2302 ('failed to reach quorum size %d '
2303 'before timeout expired' % size
)
2305 self
.log("quorum is size %d" % size
)
2307 def get_mon_health(self
, debug
=False):
2309 Extract all the monitor health information.
2311 out
= self
.raw_cluster_cmd('health', '--format=json')
2313 self
.log('health:\n{h}'.format(h
=out
))
2314 return json
.loads(out
)
2316 def get_mds_status(self
, mds
):
2318 Run cluster commands for the mds in order to get mds information
2320 out
= self
.raw_cluster_cmd('mds', 'dump', '--format=json')
2321 j
= json
.loads(' '.join(out
.splitlines()[1:]))
2322 # collate; for dup ids, larger gid wins.
2323 for info
in j
['info'].itervalues():
2324 if info
['name'] == mds
:
2328 def get_filepath(self
):
2330 Return path to osd data with {id} needing to be replaced
2332 return '/var/lib/ceph/osd/' + self
.cluster
+ '-{id}'
2334 def make_admin_daemon_dir(self
, remote
):
2336 Create /var/run/ceph directory on remote site.
2339 :param remote: Remote site
2341 remote
.run(args
=['sudo',
2342 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
2345 def utility_task(name
):
2347 Generate ceph_manager subtask corresponding to ceph_manager
2350 def task(ctx
, config
):
2353 args
= config
.get('args', [])
2354 kwargs
= config
.get('kwargs', {})
2355 cluster
= config
.get('cluster', 'ceph')
2356 fn
= getattr(ctx
.managers
[cluster
], name
)
2360 revive_osd
= utility_task("revive_osd")
2361 revive_mon
= utility_task("revive_mon")
2362 kill_osd
= utility_task("kill_osd")
2363 kill_mon
= utility_task("kill_mon")
2364 create_pool
= utility_task("create_pool")
2365 remove_pool
= utility_task("remove_pool")
2366 wait_for_clean
= utility_task("wait_for_clean")
2367 set_pool_property
= utility_task("set_pool_property")
2368 do_pg_scrub
= utility_task("do_pg_scrub")