2 ceph manager -- Thrasher and CephManager objects
4 from cStringIO
import StringIO
5 from functools
import wraps
17 from teuthology
import misc
as teuthology
18 from tasks
.scrub
import Scrubber
19 from util
.rados
import cmd_erasure_code_profile
20 from util
import get_remote
21 from teuthology
.contextutil
import safe_while
22 from teuthology
.orchestra
.remote
import Remote
23 from teuthology
.orchestra
import run
24 from teuthology
.exceptions
import CommandFailedError
27 from subprocess
import DEVNULL
# py3k
29 DEVNULL
= open(os
.devnull
, 'r+')
31 DEFAULT_CONF_PATH
= '/etc/ceph/ceph.conf'
33 log
= logging
.getLogger(__name__
)
36 def write_conf(ctx
, conf_path
=DEFAULT_CONF_PATH
, cluster
='ceph'):
38 ctx
.ceph
[cluster
].conf
.write(conf_fp
)
40 writes
= ctx
.cluster
.run(
42 'sudo', 'mkdir', '-p', '/etc/ceph', run
.Raw('&&'),
43 'sudo', 'chmod', '0755', '/etc/ceph', run
.Raw('&&'),
46 ('import shutil, sys; '
47 'shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))'),
50 'sudo', 'chmod', '0644', conf_path
,
54 teuthology
.feed_many_stdins_and_close(conf_fp
, writes
)
58 def mount_osd_data(ctx
, remote
, cluster
, osd
):
63 :param remote: Remote site
64 :param cluster: name of ceph cluster
67 log
.debug('Mounting data for osd.{o} on {r}'.format(o
=osd
, r
=remote
))
68 role
= "{0}.osd.{1}".format(cluster
, osd
)
69 alt_role
= role
if cluster
!= 'ceph' else "osd.{0}".format(osd
)
70 if remote
in ctx
.disk_config
.remote_to_roles_to_dev
:
71 if alt_role
in ctx
.disk_config
.remote_to_roles_to_dev
[remote
]:
73 if role
not in ctx
.disk_config
.remote_to_roles_to_dev
[remote
]:
75 dev
= ctx
.disk_config
.remote_to_roles_to_dev
[remote
][role
]
76 mount_options
= ctx
.disk_config
.\
77 remote_to_roles_to_dev_mount_options
[remote
][role
]
78 fstype
= ctx
.disk_config
.remote_to_roles_to_dev_fstype
[remote
][role
]
79 mnt
= os
.path
.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster
, osd
))
81 log
.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
82 'mountpoint: {p}, type: {t}, options: {v}'.format(
83 o
=osd
, n
=remote
.name
, p
=mnt
, t
=fstype
, v
=mount_options
,
91 '-o', ','.join(mount_options
),
100 Object used to thrash Ceph
102 def __init__(self
, manager
, config
, logger
=None):
103 self
.ceph_manager
= manager
104 self
.cluster
= manager
.cluster
105 self
.ceph_manager
.wait_for_clean()
106 osd_status
= self
.ceph_manager
.get_osd_status()
107 self
.in_osds
= osd_status
['in']
108 self
.live_osds
= osd_status
['live']
109 self
.out_osds
= osd_status
['out']
110 self
.dead_osds
= osd_status
['dead']
111 self
.stopping
= False
114 self
.revive_timeout
= self
.config
.get("revive_timeout", 150)
115 self
.pools_to_fix_pgp_num
= set()
116 if self
.config
.get('powercycle'):
117 self
.revive_timeout
+= 120
118 self
.clean_wait
= self
.config
.get('clean_wait', 0)
119 self
.minin
= self
.config
.get("min_in", 3)
120 self
.chance_move_pg
= self
.config
.get('chance_move_pg', 1.0)
121 self
.sighup_delay
= self
.config
.get('sighup_delay')
122 self
.optrack_toggle_delay
= self
.config
.get('optrack_toggle_delay')
123 self
.dump_ops_enable
= self
.config
.get('dump_ops_enable')
124 self
.noscrub_toggle_delay
= self
.config
.get('noscrub_toggle_delay')
125 self
.chance_thrash_cluster_full
= self
.config
.get('chance_thrash_cluster_full', .05)
126 self
.chance_thrash_pg_upmap
= self
.config
.get('chance_thrash_pg_upmap', 1.0)
127 self
.chance_thrash_pg_upmap_items
= self
.config
.get('chance_thrash_pg_upmap', 1.0)
128 self
.random_eio
= self
.config
.get('random_eio')
129 self
.chance_force_recovery
= self
.config
.get('chance_force_recovery', 0.3)
131 num_osds
= self
.in_osds
+ self
.out_osds
132 self
.max_pgs
= self
.config
.get("max_pgs_per_pool_osd", 1200) * num_osds
133 if self
.logger
is not None:
134 self
.log
= lambda x
: self
.logger
.info(x
)
138 Implement log behavior
142 if self
.config
is None:
144 # prevent monitor from auto-marking things out while thrasher runs
145 # try both old and new tell syntax, in case we are testing old code
146 self
.saved_options
= []
147 # assuming that the default settings do not vary from one daemon to
149 first_mon
= teuthology
.get_first_mon(manager
.ctx
, self
.config
).split('.')
150 opts
= [('mon', 'mon_osd_down_out_interval', 0)]
151 for service
, opt
, new_value
in opts
:
152 old_value
= manager
.get_config(first_mon
[0],
155 self
.saved_options
.append((service
, opt
, old_value
))
156 self
._set
_config
(service
, '*', opt
, new_value
)
157 # initialize ceph_objectstore_tool property - must be done before
158 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
159 if (self
.config
.get('powercycle') or
160 not self
.cmd_exists_on_osds("ceph-objectstore-tool") or
161 self
.config
.get('disable_objectstore_tool_tests', False)):
162 self
.ceph_objectstore_tool
= False
163 self
.test_rm_past_intervals
= False
164 if self
.config
.get('powercycle'):
165 self
.log("Unable to test ceph-objectstore-tool, "
166 "powercycle testing")
168 self
.log("Unable to test ceph-objectstore-tool, "
169 "not available on all OSD nodes")
171 self
.ceph_objectstore_tool
= \
172 self
.config
.get('ceph_objectstore_tool', True)
173 self
.test_rm_past_intervals
= \
174 self
.config
.get('test_rm_past_intervals', True)
176 self
.thread
= gevent
.spawn(self
.do_thrash
)
177 if self
.sighup_delay
:
178 self
.sighup_thread
= gevent
.spawn(self
.do_sighup
)
179 if self
.optrack_toggle_delay
:
180 self
.optrack_toggle_thread
= gevent
.spawn(self
.do_optrack_toggle
)
181 if self
.dump_ops_enable
== "true":
182 self
.dump_ops_thread
= gevent
.spawn(self
.do_dump_ops
)
183 if self
.noscrub_toggle_delay
:
184 self
.noscrub_toggle_thread
= gevent
.spawn(self
.do_noscrub_toggle
)
186 def _set_config(self
, service_type
, service_id
, name
, value
):
187 opt_arg
= '--{name} {value}'.format(name
=name
, value
=value
)
188 whom
= '.'.join([service_type
, service_id
])
189 self
.ceph_manager
.raw_cluster_cmd('--', 'tell', whom
,
190 'injectargs', opt_arg
)
193 def cmd_exists_on_osds(self
, cmd
):
194 allremotes
= self
.ceph_manager
.ctx
.cluster
.only(\
195 teuthology
.is_type('osd', self
.cluster
)).remotes
.keys()
196 allremotes
= list(set(allremotes
))
197 for remote
in allremotes
:
198 proc
= remote
.run(args
=['type', cmd
], wait
=True,
199 check_status
=False, stdout
=StringIO(),
201 if proc
.exitstatus
!= 0:
205 def kill_osd(self
, osd
=None, mark_down
=False, mark_out
=False):
207 :param osd: Osd to be killed.
208 :mark_down: Mark down if true.
209 :mark_out: Mark out if true.
212 osd
= random
.choice(self
.live_osds
)
213 self
.log("Killing osd %s, live_osds are %s" % (str(osd
),
214 str(self
.live_osds
)))
215 self
.live_osds
.remove(osd
)
216 self
.dead_osds
.append(osd
)
217 self
.ceph_manager
.kill_osd(osd
)
219 self
.ceph_manager
.mark_down_osd(osd
)
220 if mark_out
and osd
in self
.in_osds
:
222 if self
.ceph_objectstore_tool
:
223 self
.log("Testing ceph-objectstore-tool on down osd")
224 remote
= self
.ceph_manager
.find_remote('osd', osd
)
225 FSPATH
= self
.ceph_manager
.get_filepath()
226 JPATH
= os
.path
.join(FSPATH
, "journal")
227 exp_osd
= imp_osd
= osd
228 exp_remote
= imp_remote
= remote
229 # If an older osd is available we'll move a pg from there
230 if (len(self
.dead_osds
) > 1 and
231 random
.random() < self
.chance_move_pg
):
232 exp_osd
= random
.choice(self
.dead_osds
[:-1])
233 exp_remote
= self
.ceph_manager
.find_remote('osd', exp_osd
)
234 if ('keyvaluestore_backend' in
235 self
.ceph_manager
.ctx
.ceph
[self
.cluster
].conf
['osd']):
236 prefix
= ("sudo adjust-ulimits ceph-objectstore-tool "
237 "--data-path {fpath} --journal-path {jpath} "
238 "--type keyvaluestore "
240 "/var/log/ceph/objectstore_tool.\\$pid.log ".
241 format(fpath
=FSPATH
, jpath
=JPATH
))
243 prefix
= ("sudo adjust-ulimits ceph-objectstore-tool "
244 "--data-path {fpath} --journal-path {jpath} "
246 "/var/log/ceph/objectstore_tool.\\$pid.log ".
247 format(fpath
=FSPATH
, jpath
=JPATH
))
248 cmd
= (prefix
+ "--op list-pgs").format(id=exp_osd
)
250 # ceph-objectstore-tool might be temporarily absent during an
251 # upgrade - see http://tracker.ceph.com/issues/18014
252 with
safe_while(sleep
=15, tries
=40, action
="type ceph-objectstore-tool") as proceed
:
254 proc
= exp_remote
.run(args
=['type', 'ceph-objectstore-tool'],
255 wait
=True, check_status
=False, stdout
=StringIO(),
257 if proc
.exitstatus
== 0:
259 log
.debug("ceph-objectstore-tool binary not present, trying again")
261 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
262 # see http://tracker.ceph.com/issues/19556
263 with
safe_while(sleep
=15, tries
=40, action
="ceph-objectstore-tool --op list-pgs") as proceed
:
265 proc
= exp_remote
.run(args
=cmd
, wait
=True,
267 stdout
=StringIO(), stderr
=StringIO())
268 if proc
.exitstatus
== 0:
270 elif proc
.exitstatus
== 1 and proc
.stderr
== "OSD has the store locked":
273 raise Exception("ceph-objectstore-tool: "
274 "exp list-pgs failure with status {ret}".
275 format(ret
=proc
.exitstatus
))
277 pgs
= proc
.stdout
.getvalue().split('\n')[:-1]
279 self
.log("No PGs found for osd.{osd}".format(osd
=exp_osd
))
281 pg
= random
.choice(pgs
)
282 exp_path
= teuthology
.get_testdir(self
.ceph_manager
.ctx
)
283 exp_path
= os
.path
.join(exp_path
, '{0}.data'.format(self
.cluster
))
284 exp_path
= os
.path
.join(exp_path
,
285 "exp.{pg}.{id}".format(
289 cmd
= prefix
+ "--op export --pgid {pg} --file {file}"
290 cmd
= cmd
.format(id=exp_osd
, pg
=pg
, file=exp_path
)
291 proc
= exp_remote
.run(args
=cmd
)
293 raise Exception("ceph-objectstore-tool: "
294 "export failure with status {ret}".
295 format(ret
=proc
.exitstatus
))
297 cmd
= prefix
+ "--op remove --pgid {pg}"
298 cmd
= cmd
.format(id=exp_osd
, pg
=pg
)
299 proc
= exp_remote
.run(args
=cmd
)
301 raise Exception("ceph-objectstore-tool: "
302 "remove failure with status {ret}".
303 format(ret
=proc
.exitstatus
))
304 # If there are at least 2 dead osds we might move the pg
305 if exp_osd
!= imp_osd
:
306 # If pg isn't already on this osd, then we will move it there
307 cmd
= (prefix
+ "--op list-pgs").format(id=imp_osd
)
308 proc
= imp_remote
.run(args
=cmd
, wait
=True,
309 check_status
=False, stdout
=StringIO())
311 raise Exception("ceph-objectstore-tool: "
312 "imp list-pgs failure with status {ret}".
313 format(ret
=proc
.exitstatus
))
314 pgs
= proc
.stdout
.getvalue().split('\n')[:-1]
316 self
.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
317 format(pg
=pg
, fosd
=exp_osd
, tosd
=imp_osd
))
318 if imp_remote
!= exp_remote
:
319 # Copy export file to the other machine
320 self
.log("Transfer export file from {srem} to {trem}".
321 format(srem
=exp_remote
, trem
=imp_remote
))
322 tmpexport
= Remote
.get_file(exp_remote
, exp_path
)
323 Remote
.put_file(imp_remote
, tmpexport
, exp_path
)
326 # Can't move the pg after all
328 imp_remote
= exp_remote
330 cmd
= (prefix
+ "--op import --file {file}")
331 cmd
= cmd
.format(id=imp_osd
, file=exp_path
)
332 proc
= imp_remote
.run(args
=cmd
, wait
=True, check_status
=False,
334 if proc
.exitstatus
== 1:
335 bogosity
= "The OSD you are using is older than the exported PG"
336 if bogosity
in proc
.stderr
.getvalue():
337 self
.log("OSD older than exported PG"
339 elif proc
.exitstatus
== 10:
340 self
.log("Pool went away before processing an import"
342 elif proc
.exitstatus
== 11:
343 self
.log("Attempt to import an incompatible export"
345 elif proc
.exitstatus
:
346 raise Exception("ceph-objectstore-tool: "
347 "import failure with status {ret}".
348 format(ret
=proc
.exitstatus
))
349 cmd
= "rm -f {file}".format(file=exp_path
)
350 exp_remote
.run(args
=cmd
)
351 if imp_remote
!= exp_remote
:
352 imp_remote
.run(args
=cmd
)
354 # apply low split settings to each pool
355 for pool
in self
.ceph_manager
.list_pools():
356 no_sudo_prefix
= prefix
[5:]
357 cmd
= ("CEPH_ARGS='--filestore-merge-threshold 1 "
358 "--filestore-split-multiple 1' sudo -E "
359 + no_sudo_prefix
+ "--op apply-layout-settings --pool " + pool
).format(id=osd
)
360 proc
= remote
.run(args
=cmd
, wait
=True, check_status
=False, stderr
=StringIO())
361 output
= proc
.stderr
.getvalue()
362 if 'Couldn\'t find pool' in output
:
365 raise Exception("ceph-objectstore-tool apply-layout-settings"
366 " failed with {status}".format(status
=proc
.exitstatus
))
368 def rm_past_intervals(self
, osd
=None):
370 :param osd: Osd to find pg to remove past intervals
372 if self
.test_rm_past_intervals
:
374 osd
= random
.choice(self
.dead_osds
)
375 self
.log("Use ceph_objectstore_tool to remove past intervals")
376 remote
= self
.ceph_manager
.find_remote('osd', osd
)
377 FSPATH
= self
.ceph_manager
.get_filepath()
378 JPATH
= os
.path
.join(FSPATH
, "journal")
379 if ('keyvaluestore_backend' in
380 self
.ceph_manager
.ctx
.ceph
[self
.cluster
].conf
['osd']):
381 prefix
= ("sudo adjust-ulimits ceph-objectstore-tool "
382 "--data-path {fpath} --journal-path {jpath} "
383 "--type keyvaluestore "
385 "/var/log/ceph/objectstore_tool.\\$pid.log ".
386 format(fpath
=FSPATH
, jpath
=JPATH
))
388 prefix
= ("sudo adjust-ulimits ceph-objectstore-tool "
389 "--data-path {fpath} --journal-path {jpath} "
391 "/var/log/ceph/objectstore_tool.\\$pid.log ".
392 format(fpath
=FSPATH
, jpath
=JPATH
))
393 cmd
= (prefix
+ "--op list-pgs").format(id=osd
)
394 proc
= remote
.run(args
=cmd
, wait
=True,
395 check_status
=False, stdout
=StringIO())
397 raise Exception("ceph_objectstore_tool: "
398 "exp list-pgs failure with status {ret}".
399 format(ret
=proc
.exitstatus
))
400 pgs
= proc
.stdout
.getvalue().split('\n')[:-1]
402 self
.log("No PGs found for osd.{osd}".format(osd
=osd
))
404 pg
= random
.choice(pgs
)
405 cmd
= (prefix
+ "--op rm-past-intervals --pgid {pg}").\
406 format(id=osd
, pg
=pg
)
407 proc
= remote
.run(args
=cmd
)
409 raise Exception("ceph_objectstore_tool: "
410 "rm-past-intervals failure with status {ret}".
411 format(ret
=proc
.exitstatus
))
413 def blackhole_kill_osd(self
, osd
=None):
415 If all else fails, kill the osd.
416 :param osd: Osd to be killed.
419 osd
= random
.choice(self
.live_osds
)
420 self
.log("Blackholing and then killing osd %s, live_osds are %s" %
421 (str(osd
), str(self
.live_osds
)))
422 self
.live_osds
.remove(osd
)
423 self
.dead_osds
.append(osd
)
424 self
.ceph_manager
.blackhole_kill_osd(osd
)
426 def revive_osd(self
, osd
=None, skip_admin_check
=False):
429 :param osd: Osd to be revived.
432 osd
= random
.choice(self
.dead_osds
)
433 self
.log("Reviving osd %s" % (str(osd
),))
434 self
.ceph_manager
.revive_osd(
437 skip_admin_check
=skip_admin_check
)
438 self
.dead_osds
.remove(osd
)
439 self
.live_osds
.append(osd
)
440 if self
.random_eio
> 0 and osd
is self
.rerrosd
:
441 self
.ceph_manager
.raw_cluster_cmd('tell', 'osd.'+str(self
.rerrosd
),
442 'injectargs', '--', '--filestore_debug_random_read_err='+str(self
.random_eio
))
443 self
.ceph_manager
.raw_cluster_cmd('tell', 'osd.'+str(self
.rerrosd
),
444 'injectargs', '--', '--bluestore_debug_random_read_err='+str(self
.random_eio
))
447 def out_osd(self
, osd
=None):
450 :param osd: Osd to be marked.
453 osd
= random
.choice(self
.in_osds
)
454 self
.log("Removing osd %s, in_osds are: %s" %
455 (str(osd
), str(self
.in_osds
)))
456 self
.ceph_manager
.mark_out_osd(osd
)
457 self
.in_osds
.remove(osd
)
458 self
.out_osds
.append(osd
)
460 def in_osd(self
, osd
=None):
463 :param osd: Osd to be marked.
466 osd
= random
.choice(self
.out_osds
)
467 if osd
in self
.dead_osds
:
468 return self
.revive_osd(osd
)
469 self
.log("Adding osd %s" % (str(osd
),))
470 self
.out_osds
.remove(osd
)
471 self
.in_osds
.append(osd
)
472 self
.ceph_manager
.mark_in_osd(osd
)
473 self
.log("Added osd %s" % (str(osd
),))
475 def reweight_osd_or_by_util(self
, osd
=None):
477 Reweight an osd that is in
478 :param osd: Osd to be marked.
480 if osd
is not None or random
.choice([True, False]):
482 osd
= random
.choice(self
.in_osds
)
483 val
= random
.uniform(.1, 1.0)
484 self
.log("Reweighting osd %s to %s" % (str(osd
), str(val
)))
485 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
488 # do it several times, the option space is large
491 'max_change': random
.choice(['0.05', '1.0', '3.0']),
492 'overage': random
.choice(['110', '1000']),
493 'type': random
.choice([
494 'reweight-by-utilization',
495 'test-reweight-by-utilization']),
497 self
.log("Reweighting by: %s"%(str(options
),))
498 self
.ceph_manager
.raw_cluster_cmd(
502 options
['max_change'])
504 def primary_affinity(self
, osd
=None):
506 osd
= random
.choice(self
.in_osds
)
507 if random
.random() >= .5:
509 elif random
.random() >= .5:
513 self
.log('Setting osd %s primary_affinity to %f' % (str(osd
), pa
))
514 self
.ceph_manager
.raw_cluster_cmd('osd', 'primary-affinity',
517 def thrash_cluster_full(self
):
519 Set and unset cluster full condition
521 self
.log('Setting full ratio to .001')
522 self
.ceph_manager
.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
524 self
.log('Setting full ratio back to .95')
525 self
.ceph_manager
.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
527 def thrash_pg_upmap(self
):
529 Install or remove random pg_upmap entries in OSDMap
531 from random
import shuffle
532 out
= self
.ceph_manager
.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
534 self
.log('j is %s' % j
)
536 if random
.random() >= .3:
537 pgs
= self
.ceph_manager
.get_pg_stats()
538 pg
= random
.choice(pgs
)
539 pgid
= str(pg
['pgid'])
540 poolid
= int(pgid
.split('.')[0])
541 sizes
= [x
['size'] for x
in j
['pools'] if x
['pool'] == poolid
]
545 osds
= self
.in_osds
+ self
.out_osds
548 self
.log('Setting %s to %s' % (pgid
, osds
))
549 cmd
= ['osd', 'pg-upmap', pgid
] + [str(x
) for x
in osds
]
550 self
.log('cmd %s' % cmd
)
551 self
.ceph_manager
.raw_cluster_cmd(*cmd
)
557 self
.log('Clearing pg_upmap on %s' % pg
)
558 self
.ceph_manager
.raw_cluster_cmd(
563 self
.log('No pg_upmap entries; doing nothing')
564 except CommandFailedError
:
565 self
.log('Failed to rm-pg-upmap, ignoring')
567 def thrash_pg_upmap_items(self
):
569 Install or remove random pg_upmap_items entries in OSDMap
571 from random
import shuffle
572 out
= self
.ceph_manager
.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
574 self
.log('j is %s' % j
)
576 if random
.random() >= .3:
577 pgs
= self
.ceph_manager
.get_pg_stats()
578 pg
= random
.choice(pgs
)
579 pgid
= str(pg
['pgid'])
580 poolid
= int(pgid
.split('.')[0])
581 sizes
= [x
['size'] for x
in j
['pools'] if x
['pool'] == poolid
]
585 osds
= self
.in_osds
+ self
.out_osds
588 self
.log('Setting %s to %s' % (pgid
, osds
))
589 cmd
= ['osd', 'pg-upmap-items', pgid
] + [str(x
) for x
in osds
]
590 self
.log('cmd %s' % cmd
)
591 self
.ceph_manager
.raw_cluster_cmd(*cmd
)
593 m
= j
['pg_upmap_items']
597 self
.log('Clearing pg_upmap on %s' % pg
)
598 self
.ceph_manager
.raw_cluster_cmd(
603 self
.log('No pg_upmap entries; doing nothing')
604 except CommandFailedError
:
605 self
.log('Failed to rm-pg-upmap-items, ignoring')
607 def force_recovery(self
):
609 Force recovery on some of PGs
611 backfill
= random
.random() >= 0.5
612 j
= self
.ceph_manager
.get_pgids_to_force(backfill
)
615 self
.ceph_manager
.raw_cluster_cmd('pg', 'force-backfill', *j
)
617 self
.ceph_manager
.raw_cluster_cmd('pg', 'force-recovery', *j
)
619 def cancel_force_recovery(self
):
621 Force recovery on some of PGs
623 backfill
= random
.random() >= 0.5
624 j
= self
.ceph_manager
.get_pgids_to_cancel_force(backfill
)
627 self
.ceph_manager
.raw_cluster_cmd('pg', 'cancel-force-backfill', *j
)
629 self
.ceph_manager
.raw_cluster_cmd('pg', 'cancel-force-recovery', *j
)
631 def force_cancel_recovery(self
):
633 Force or cancel forcing recovery
635 if random
.random() >= 0.4:
636 self
.force_recovery()
638 self
.cancel_force_recovery()
642 Make sure all osds are up and not out.
644 while len(self
.dead_osds
) > 0:
645 self
.log("reviving osd")
647 while len(self
.out_osds
) > 0:
648 self
.log("inning osd")
653 Make sure all osds are up and fully in.
656 for osd
in self
.live_osds
:
657 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
659 self
.ceph_manager
.raw_cluster_cmd('osd', 'primary-affinity',
664 Break out of this Ceph loop
668 if self
.sighup_delay
:
669 self
.log("joining the do_sighup greenlet")
670 self
.sighup_thread
.get()
671 if self
.optrack_toggle_delay
:
672 self
.log("joining the do_optrack_toggle greenlet")
673 self
.optrack_toggle_thread
.join()
674 if self
.dump_ops_enable
== "true":
675 self
.log("joining the do_dump_ops greenlet")
676 self
.dump_ops_thread
.join()
677 if self
.noscrub_toggle_delay
:
678 self
.log("joining the do_noscrub_toggle greenlet")
679 self
.noscrub_toggle_thread
.join()
683 Increase the size of the pool
685 pool
= self
.ceph_manager
.get_pool()
686 orig_pg_num
= self
.ceph_manager
.get_pool_pg_num(pool
)
687 self
.log("Growing pool %s" % (pool
,))
688 if self
.ceph_manager
.expand_pool(pool
,
689 self
.config
.get('pool_grow_by', 10),
691 self
.pools_to_fix_pgp_num
.add(pool
)
693 def fix_pgp_num(self
, pool
=None):
695 Fix number of pgs in pool.
698 pool
= self
.ceph_manager
.get_pool()
702 self
.log("fixing pg num pool %s" % (pool
,))
703 if self
.ceph_manager
.set_pool_pgpnum(pool
, force
):
704 self
.pools_to_fix_pgp_num
.discard(pool
)
706 def test_pool_min_size(self
):
708 Kill and revive all osds except one.
710 self
.log("test_pool_min_size")
712 self
.ceph_manager
.wait_for_recovery(
713 timeout
=self
.config
.get('timeout')
715 the_one
= random
.choice(self
.in_osds
)
716 self
.log("Killing everyone but %s", the_one
)
717 to_kill
= filter(lambda x
: x
!= the_one
, self
.in_osds
)
718 [self
.kill_osd(i
) for i
in to_kill
]
719 [self
.out_osd(i
) for i
in to_kill
]
720 time
.sleep(self
.config
.get("test_pool_min_size_time", 10))
721 self
.log("Killing %s" % (the_one
,))
722 self
.kill_osd(the_one
)
723 self
.out_osd(the_one
)
724 self
.log("Reviving everyone but %s" % (the_one
,))
725 [self
.revive_osd(i
) for i
in to_kill
]
726 [self
.in_osd(i
) for i
in to_kill
]
727 self
.log("Revived everyone but %s" % (the_one
,))
728 self
.log("Waiting for clean")
729 self
.ceph_manager
.wait_for_recovery(
730 timeout
=self
.config
.get('timeout')
733 def inject_pause(self
, conf_key
, duration
, check_after
, should_be_down
):
735 Pause injection testing. Check for osd being down when finished.
737 the_one
= random
.choice(self
.live_osds
)
738 self
.log("inject_pause on {osd}".format(osd
=the_one
))
740 "Testing {key} pause injection for duration {duration}".format(
745 "Checking after {after}, should_be_down={shouldbedown}".format(
747 shouldbedown
=should_be_down
749 self
.ceph_manager
.set_config(the_one
, **{conf_key
: duration
})
750 if not should_be_down
:
752 time
.sleep(check_after
)
753 status
= self
.ceph_manager
.get_osd_status()
754 assert the_one
in status
['down']
755 time
.sleep(duration
- check_after
+ 20)
756 status
= self
.ceph_manager
.get_osd_status()
757 assert not the_one
in status
['down']
759 def test_backfill_full(self
):
761 Test backfills stopping when the replica fills up.
763 First, use injectfull admin command to simulate a now full
764 osd by setting it to 0 on all of the OSDs.
766 Second, on a random subset, set
767 osd_debug_skip_full_check_in_backfill_reservation to force
768 the more complicated check in do_scan to be exercised.
770 Then, verify that all backfills stop.
772 self
.log("injecting backfill full")
773 for i
in self
.live_osds
:
774 self
.ceph_manager
.set_config(
776 osd_debug_skip_full_check_in_backfill_reservation
=
777 random
.choice(['false', 'true']))
778 self
.ceph_manager
.osd_admin_socket(i
, command
=['injectfull', 'backfillfull'],
779 check_status
=True, timeout
=30, stdout
=DEVNULL
)
781 status
= self
.ceph_manager
.compile_pg_status()
782 if 'backfill' not in status
.keys():
785 "waiting for {still_going} backfills".format(
786 still_going
=status
.get('backfill')))
788 assert('backfill' not in self
.ceph_manager
.compile_pg_status().keys())
789 for i
in self
.live_osds
:
790 self
.ceph_manager
.set_config(
792 osd_debug_skip_full_check_in_backfill_reservation
='false')
793 self
.ceph_manager
.osd_admin_socket(i
, command
=['injectfull', 'none'],
794 check_status
=True, timeout
=30, stdout
=DEVNULL
)
796 def test_map_discontinuity(self
):
798 1) Allows the osds to recover
800 3) allows the remaining osds to recover
801 4) waits for some time
803 This sequence should cause the revived osd to have to handle
804 a map gap since the mons would have trimmed
806 while len(self
.in_osds
) < (self
.minin
+ 1):
808 self
.log("Waiting for recovery")
809 self
.ceph_manager
.wait_for_all_osds_up(
810 timeout
=self
.config
.get('timeout')
812 # now we wait 20s for the pg status to change, if it takes longer,
813 # the test *should* fail!
815 self
.ceph_manager
.wait_for_clean(
816 timeout
=self
.config
.get('timeout')
819 # now we wait 20s for the backfill replicas to hear about the clean
821 self
.log("Recovered, killing an osd")
822 self
.kill_osd(mark_down
=True, mark_out
=True)
823 self
.log("Waiting for clean again")
824 self
.ceph_manager
.wait_for_clean(
825 timeout
=self
.config
.get('timeout')
827 self
.log("Waiting for trim")
828 time
.sleep(int(self
.config
.get("map_discontinuity_sleep_time", 40)))
831 def choose_action(self
):
833 Random action selector.
835 chance_down
= self
.config
.get('chance_down', 0.4)
836 chance_test_min_size
= self
.config
.get('chance_test_min_size', 0)
837 chance_test_backfill_full
= \
838 self
.config
.get('chance_test_backfill_full', 0)
839 if isinstance(chance_down
, int):
840 chance_down
= float(chance_down
) / 100
842 minout
= self
.config
.get("min_out", 0)
843 minlive
= self
.config
.get("min_live", 2)
844 mindead
= self
.config
.get("min_dead", 0)
846 self
.log('choose_action: min_in %d min_out '
847 '%d min_live %d min_dead %d' %
848 (minin
, minout
, minlive
, mindead
))
850 if len(self
.in_osds
) > minin
:
851 actions
.append((self
.out_osd
, 1.0,))
852 if len(self
.live_osds
) > minlive
and chance_down
> 0:
853 actions
.append((self
.kill_osd
, chance_down
,))
854 if len(self
.dead_osds
) > 1:
855 actions
.append((self
.rm_past_intervals
, 1.0,))
856 if len(self
.out_osds
) > minout
:
857 actions
.append((self
.in_osd
, 1.7,))
858 if len(self
.dead_osds
) > mindead
:
859 actions
.append((self
.revive_osd
, 1.0,))
860 if self
.config
.get('thrash_primary_affinity', True):
861 actions
.append((self
.primary_affinity
, 1.0,))
862 actions
.append((self
.reweight_osd_or_by_util
,
863 self
.config
.get('reweight_osd', .5),))
864 actions
.append((self
.grow_pool
,
865 self
.config
.get('chance_pgnum_grow', 0),))
866 actions
.append((self
.fix_pgp_num
,
867 self
.config
.get('chance_pgpnum_fix', 0),))
868 actions
.append((self
.test_pool_min_size
,
869 chance_test_min_size
,))
870 actions
.append((self
.test_backfill_full
,
871 chance_test_backfill_full
,))
872 if self
.chance_thrash_cluster_full
> 0:
873 actions
.append((self
.thrash_cluster_full
, self
.chance_thrash_cluster_full
,))
874 if self
.chance_thrash_pg_upmap
> 0:
875 actions
.append((self
.thrash_pg_upmap
, self
.chance_thrash_pg_upmap
,))
876 if self
.chance_thrash_pg_upmap_items
> 0:
877 actions
.append((self
.thrash_pg_upmap_items
, self
.chance_thrash_pg_upmap_items
,))
878 if self
.chance_force_recovery
> 0:
879 actions
.append((self
.force_cancel_recovery
, self
.chance_force_recovery
))
881 for key
in ['heartbeat_inject_failure', 'filestore_inject_stall']:
884 self
.inject_pause(key
,
885 self
.config
.get('pause_short', 3),
888 self
.config
.get('chance_inject_pause_short', 1),),
890 self
.inject_pause(key
,
891 self
.config
.get('pause_long', 80),
892 self
.config
.get('pause_check_after', 70),
894 self
.config
.get('chance_inject_pause_long', 0),)]:
895 actions
.append(scenario
)
897 total
= sum([y
for (x
, y
) in actions
])
898 val
= random
.uniform(0, total
)
899 for (action
, prob
) in actions
:
911 self
.log(traceback
.format_exc())
918 Loops and sends signal.SIGHUP to a random live osd.
920 Loop delay is controlled by the config value sighup_delay.
922 delay
= float(self
.sighup_delay
)
923 self
.log("starting do_sighup with a delay of {0}".format(delay
))
924 while not self
.stopping
:
925 osd
= random
.choice(self
.live_osds
)
926 self
.ceph_manager
.signal_osd(osd
, signal
.SIGHUP
, silent
=True)
930 def do_optrack_toggle(self
):
932 Loops and toggle op tracking to all osds.
934 Loop delay is controlled by the config value optrack_toggle_delay.
936 delay
= float(self
.optrack_toggle_delay
)
938 self
.log("starting do_optrack_toggle with a delay of {0}".format(delay
))
939 while not self
.stopping
:
940 if osd_state
== "true":
944 self
.ceph_manager
.raw_cluster_cmd_result('tell', 'osd.*',
945 'injectargs', '--osd_enable_op_tracker=%s' % osd_state
)
949 def do_dump_ops(self
):
951 Loops and does op dumps on all osds
953 self
.log("starting do_dump_ops")
954 while not self
.stopping
:
955 for osd
in self
.live_osds
:
956 # Ignore errors because live_osds is in flux
957 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_ops_in_flight'],
958 check_status
=False, timeout
=30, stdout
=DEVNULL
)
959 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_blocked_ops'],
960 check_status
=False, timeout
=30, stdout
=DEVNULL
)
961 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_historic_ops'],
962 check_status
=False, timeout
=30, stdout
=DEVNULL
)
966 def do_noscrub_toggle(self
):
968 Loops and toggle noscrub flags
970 Loop delay is controlled by the config value noscrub_toggle_delay.
972 delay
= float(self
.noscrub_toggle_delay
)
974 self
.log("starting do_noscrub_toggle with a delay of {0}".format(delay
))
975 while not self
.stopping
:
976 if scrub_state
== "none":
977 self
.ceph_manager
.raw_cluster_cmd('osd', 'set', 'noscrub')
978 scrub_state
= "noscrub"
979 elif scrub_state
== "noscrub":
980 self
.ceph_manager
.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
982 elif scrub_state
== "both":
983 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'noscrub')
984 scrub_state
= "nodeep-scrub"
986 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
989 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'noscrub')
990 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
995 Loop to select random actions to thrash ceph manager with.
997 cleanint
= self
.config
.get("clean_interval", 60)
998 scrubint
= self
.config
.get("scrub_interval", -1)
999 maxdead
= self
.config
.get("max_dead", 0)
1000 delay
= self
.config
.get("op_delay", 5)
1001 self
.rerrosd
= self
.live_osds
[0]
1002 if self
.random_eio
> 0:
1003 self
.ceph_manager
.raw_cluster_cmd('tell', 'osd.'+str(self
.rerrosd
),
1004 'injectargs', '--', '--filestore_debug_random_read_err='+str(self
.random_eio
))
1005 self
.ceph_manager
.raw_cluster_cmd('tell', 'osd.'+str(self
.rerrosd
),
1006 'injectargs', '--', '--bluestore_debug_random_read_err='+str(self
.random_eio
))
1007 self
.log("starting do_thrash")
1008 while not self
.stopping
:
1009 to_log
= [str(x
) for x
in ["in_osds: ", self
.in_osds
,
1010 "out_osds: ", self
.out_osds
,
1011 "dead_osds: ", self
.dead_osds
,
1012 "live_osds: ", self
.live_osds
]]
1013 self
.log(" ".join(to_log
))
1014 if random
.uniform(0, 1) < (float(delay
) / cleanint
):
1015 while len(self
.dead_osds
) > maxdead
:
1017 for osd
in self
.in_osds
:
1018 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
1020 if random
.uniform(0, 1) < float(
1021 self
.config
.get('chance_test_map_discontinuity', 0)):
1022 self
.test_map_discontinuity()
1024 self
.ceph_manager
.wait_for_recovery(
1025 timeout
=self
.config
.get('timeout')
1027 time
.sleep(self
.clean_wait
)
1029 if random
.uniform(0, 1) < (float(delay
) / scrubint
):
1030 self
.log('Scrubbing while thrashing being performed')
1031 Scrubber(self
.ceph_manager
, self
.config
)
1032 self
.choose_action()()
1034 if self
.random_eio
> 0:
1035 self
.ceph_manager
.raw_cluster_cmd('tell', 'osd.'+str(self
.rerrosd
),
1036 'injectargs', '--', '--filestore_debug_random_read_err=0.0')
1037 self
.ceph_manager
.raw_cluster_cmd('tell', 'osd.'+str(self
.rerrosd
),
1038 'injectargs', '--', '--bluestore_debug_random_read_err=0.0')
1039 for pool
in list(self
.pools_to_fix_pgp_num
):
1040 if self
.ceph_manager
.get_pool_pg_num(pool
) > 0:
1041 self
.fix_pgp_num(pool
)
1042 self
.pools_to_fix_pgp_num
.clear()
1043 for service
, opt
, saved_value
in self
.saved_options
:
1044 self
._set
_config
(service
, '*', opt
, saved_value
)
1045 self
.saved_options
= []
1049 class ObjectStoreTool
:
1051 def __init__(self
, manager
, pool
, **kwargs
):
1052 self
.manager
= manager
1054 self
.osd
= kwargs
.get('osd', None)
1055 self
.object_name
= kwargs
.get('object_name', None)
1056 self
.do_revive
= kwargs
.get('do_revive', True)
1057 if self
.osd
and self
.pool
and self
.object_name
:
1058 if self
.osd
== "primary":
1059 self
.osd
= self
.manager
.get_object_primary(self
.pool
,
1062 if self
.object_name
:
1063 self
.pgid
= self
.manager
.get_object_pg_with_shard(self
.pool
,
1066 self
.remote
= self
.manager
.ctx
.\
1067 cluster
.only('osd.{o}'.format(o
=self
.osd
)).remotes
.keys()[0]
1068 path
= self
.manager
.get_filepath().format(id=self
.osd
)
1069 self
.paths
= ("--data-path {path} --journal-path {path}/journal".
1072 def build_cmd(self
, options
, args
, stdin
):
1074 if self
.object_name
:
1075 lines
.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1076 "{paths} --pgid {pgid} --op list |"
1077 "grep '\"oid\":\"{name}\"')".
1078 format(paths
=self
.paths
,
1080 name
=self
.object_name
))
1081 args
= '"$object" ' + args
1082 options
+= " --pgid {pgid}".format(pgid
=self
.pgid
)
1083 cmd
= ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1084 format(paths
=self
.paths
,
1088 cmd
= ("echo {payload} | base64 --decode | {cmd}".
1089 format(payload
=base64
.encode(stdin
),
1092 return "\n".join(lines
)
1094 def run(self
, options
, args
, stdin
=None, stdout
=None):
1097 self
.manager
.kill_osd(self
.osd
)
1098 cmd
= self
.build_cmd(options
, args
, stdin
)
1099 self
.manager
.log(cmd
)
1101 proc
= self
.remote
.run(args
=['bash', '-e', '-x', '-c', cmd
],
1106 if proc
.exitstatus
!= 0:
1107 self
.manager
.log("failed with " + str(proc
.exitstatus
))
1108 error
= proc
.stdout
.getvalue() + " " + proc
.stderr
.getvalue()
1109 raise Exception(error
)
1112 self
.manager
.revive_osd(self
.osd
)
1113 self
.manager
.wait_till_osd_is_up(self
.osd
, 300)
1118 Ceph manager object.
1119 Contains several local functions that form a bulk of this module.
1121 Note: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1126 ERASURE_CODED_POOL
= 3
1128 def __init__(self
, controller
, ctx
=None, config
=None, logger
=None,
1130 self
.lock
= threading
.RLock()
1132 self
.config
= config
1133 self
.controller
= controller
1134 self
.next_pool_id
= 0
1135 self
.cluster
= cluster
1137 self
.log
= lambda x
: logger
.info(x
)
1141 implement log behavior.
1145 if self
.config
is None:
1146 self
.config
= dict()
1147 pools
= self
.list_pools()
1150 # we may race with a pool deletion; ignore failures here
1152 self
.pools
[pool
] = self
.get_pool_property(pool
, 'pg_num')
1153 except CommandFailedError
:
1154 self
.log('Failed to get pg_num from pool %s, ignoring' % pool
)
1156 def raw_cluster_cmd(self
, *args
):
1158 Start ceph on a raw cluster. Return count
1160 testdir
= teuthology
.get_testdir(self
.ctx
)
1165 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1172 ceph_args
.extend(args
)
1173 proc
= self
.controller
.run(
1177 return proc
.stdout
.getvalue()
1179 def raw_cluster_cmd_result(self
, *args
):
1181 Start ceph on a cluster. Return success or failure information.
1183 testdir
= teuthology
.get_testdir(self
.ctx
)
1188 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1195 ceph_args
.extend(args
)
1196 proc
= self
.controller
.run(
1200 return proc
.exitstatus
1202 def run_ceph_w(self
):
1204 Execute "ceph -w" in the background with stdout connected to a StringIO,
1205 and return the RemoteProcess.
1207 return self
.controller
.run(
1215 wait
=False, stdout
=StringIO(), stdin
=run
.PIPE
)
1217 def flush_pg_stats(self
, osds
, no_wait
=None, wait_for_mon
=300):
1219 Flush pg stats from a list of OSD ids, ensuring they are reflected
1220 all the way to the monitor. Luminous and later only.
1222 :param osds: list of OSDs to flush
1223 :param no_wait: list of OSDs not to wait for seq id. by default, we
1224 wait for all specified osds, but some of them could be
1225 moved out of osdmap, so we cannot get their updated
1226 stat seq from monitor anymore. in that case, you need
1227 to pass a blacklist.
1228 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
1229 it. (5 min by default)
1231 seq
= {osd
: self
.raw_cluster_cmd('tell', 'osd.%d' % osd
, 'flush_pg_stats')
1233 if not wait_for_mon
:
1237 for osd
, need
in seq
.iteritems():
1241 while wait_for_mon
> 0:
1242 got
= self
.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd
)
1243 self
.log('need seq {need} got {got} for osd.{osd}'.format(
1244 need
=need
, got
=got
, osd
=osd
))
1249 wait_for_mon
-= A_WHILE
1251 raise Exception('timed out waiting for mon to be updated with '
1252 'osd.{osd}: {got} < {need}'.
1253 format(osd
=osd
, got
=got
, need
=need
))
1255 def flush_all_pg_stats(self
):
1256 self
.flush_pg_stats(range(len(self
.get_osd_dump())))
1258 def do_rados(self
, remote
, cmd
, check_status
=True):
1260 Execute a remote rados command.
1262 testdir
= teuthology
.get_testdir(self
.ctx
)
1266 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1275 check_status
=check_status
1279 def rados_write_objects(self
, pool
, num_objects
, size
,
1280 timelimit
, threads
, cleanup
=False):
1283 Threads not used yet.
1287 '--num-objects', num_objects
,
1293 args
.append('--no-cleanup')
1294 return self
.do_rados(self
.controller
, map(str, args
))
1296 def do_put(self
, pool
, obj
, fname
, namespace
=None):
1298 Implement rados put operation
1301 if namespace
is not None:
1302 args
+= ['-N', namespace
]
1308 return self
.do_rados(
1314 def do_get(self
, pool
, obj
, fname
='/dev/null', namespace
=None):
1316 Implement rados get operation
1319 if namespace
is not None:
1320 args
+= ['-N', namespace
]
1326 return self
.do_rados(
1332 def do_rm(self
, pool
, obj
, namespace
=None):
1334 Implement rados rm operation
1337 if namespace
is not None:
1338 args
+= ['-N', namespace
]
1343 return self
.do_rados(
1349 def osd_admin_socket(self
, osd_id
, command
, check_status
=True, timeout
=0, stdout
=None):
1352 return self
.admin_socket('osd', osd_id
, command
, check_status
, timeout
, stdout
)
1354 def find_remote(self
, service_type
, service_id
):
1356 Get the Remote for the host where a particular service runs.
1358 :param service_type: 'mds', 'osd', 'client'
1359 :param service_id: The second part of a role, e.g. '0' for
1361 :return: a Remote instance for the host where the
1362 requested role is placed
1364 return get_remote(self
.ctx
, self
.cluster
,
1365 service_type
, service_id
)
1367 def admin_socket(self
, service_type
, service_id
,
1368 command
, check_status
=True, timeout
=0, stdout
=None):
1370 Remotely start up ceph specifying the admin socket
1371 :param command: a list of words to use as the command
1376 testdir
= teuthology
.get_testdir(self
.ctx
)
1377 remote
= self
.find_remote(service_type
, service_id
)
1382 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1389 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1390 cluster
=self
.cluster
,
1394 args
.extend(command
)
1399 check_status
=check_status
1402 def objectstore_tool(self
, pool
, options
, args
, **kwargs
):
1403 return ObjectStoreTool(self
, pool
, **kwargs
).run(options
, args
)
1405 def get_pgid(self
, pool
, pgnum
):
1407 :param pool: pool name
1408 :param pgnum: pg number
1409 :returns: a string representing this pg.
1411 poolnum
= self
.get_pool_num(pool
)
1412 pg_str
= "{poolnum}.{pgnum}".format(
1417 def get_pg_replica(self
, pool
, pgnum
):
1419 get replica for pool, pgnum (e.g. (data, 0)->0
1421 pg_str
= self
.get_pgid(pool
, pgnum
)
1422 output
= self
.raw_cluster_cmd("pg", "map", pg_str
, '--format=json')
1423 j
= json
.loads('\n'.join(output
.split('\n')[1:]))
1424 return int(j
['acting'][-1])
1427 def wait_for_pg_stats(func
):
1428 # both osd_mon_report_interval_min and mgr_stats_period are 5 seconds
1429 # by default, and take the faulty injection in ms into consideration,
1430 # 12 seconds are more than enough
1431 delays
= [1, 1, 2, 3, 5, 8, 13]
1433 def wrapper(self
, *args
, **kwargs
):
1435 for delay
in delays
:
1437 return func(self
, *args
, **kwargs
)
1438 except AssertionError as e
:
1444 def get_pg_primary(self
, pool
, pgnum
):
1446 get primary for pool, pgnum (e.g. (data, 0)->0
1448 pg_str
= self
.get_pgid(pool
, pgnum
)
1449 output
= self
.raw_cluster_cmd("pg", "map", pg_str
, '--format=json')
1450 j
= json
.loads('\n'.join(output
.split('\n')[1:]))
1451 return int(j
['acting'][0])
1454 def get_pool_num(self
, pool
):
1456 get number for pool (e.g., data -> 2)
1458 return int(self
.get_pool_dump(pool
)['pool'])
1460 def list_pools(self
):
1464 osd_dump
= self
.get_osd_dump_json()
1465 self
.log(osd_dump
['pools'])
1466 return [str(i
['pool_name']) for i
in osd_dump
['pools']]
1468 def clear_pools(self
):
1472 [self
.remove_pool(i
) for i
in self
.list_pools()]
1474 def kick_recovery_wq(self
, osdnum
):
1476 Run kick_recovery_wq on cluster.
1478 return self
.raw_cluster_cmd(
1479 'tell', "osd.%d" % (int(osdnum
),),
1484 def wait_run_admin_socket(self
, service_type
,
1485 service_id
, args
=['version'], timeout
=75, stdout
=None):
1487 If osd_admin_socket call suceeds, return. Otherwise wait
1488 five seconds and try again.
1494 proc
= self
.admin_socket(service_type
, service_id
,
1495 args
, check_status
=False, stdout
=stdout
)
1496 if proc
.exitstatus
is 0:
1500 if (tries
* 5) > timeout
:
1501 raise Exception('timed out waiting for admin_socket '
1502 'to appear after {type}.{id} restart'.
1503 format(type=service_type
,
1505 self
.log("waiting on admin_socket for {type}-{id}, "
1506 "{command}".format(type=service_type
,
1511 def get_pool_dump(self
, pool
):
1513 get the osd dump part of a pool
1515 osd_dump
= self
.get_osd_dump_json()
1516 for i
in osd_dump
['pools']:
1517 if i
['pool_name'] == pool
:
1521 def get_config(self
, service_type
, service_id
, name
):
1523 :param node: like 'mon.a'
1524 :param name: the option name
1526 proc
= self
.wait_run_admin_socket(service_type
, service_id
,
1528 j
= json
.loads(proc
.stdout
.getvalue())
1531 def set_config(self
, osdnum
, **argdict
):
1533 :param osdnum: osd number
1534 :param argdict: dictionary containing values to set.
1536 for k
, v
in argdict
.iteritems():
1537 self
.wait_run_admin_socket(
1539 ['config', 'set', str(k
), str(v
)])
1541 def raw_cluster_status(self
):
1543 Get status from cluster
1545 status
= self
.raw_cluster_cmd('status', '--format=json-pretty')
1546 return json
.loads(status
)
1548 def raw_osd_status(self
):
1550 Get osd status from cluster
1552 return self
.raw_cluster_cmd('osd', 'dump')
1554 def get_osd_status(self
):
1556 Get osd statuses sorted by states that the osds are in.
1559 lambda x
: x
.startswith('osd.') and (("up" in x
) or ("down" in x
)),
1560 self
.raw_osd_status().split('\n'))
1562 in_osds
= [int(i
[4:].split()[0])
1563 for i
in filter(lambda x
: " in " in x
, osd_lines
)]
1564 out_osds
= [int(i
[4:].split()[0])
1565 for i
in filter(lambda x
: " out " in x
, osd_lines
)]
1566 up_osds
= [int(i
[4:].split()[0])
1567 for i
in filter(lambda x
: " up " in x
, osd_lines
)]
1568 down_osds
= [int(i
[4:].split()[0])
1569 for i
in filter(lambda x
: " down " in x
, osd_lines
)]
1570 dead_osds
= [int(x
.id_
)
1571 for x
in filter(lambda x
:
1574 iter_daemons_of_role('osd', self
.cluster
))]
1575 live_osds
= [int(x
.id_
) for x
in
1578 self
.ctx
.daemons
.iter_daemons_of_role('osd',
1580 return {'in': in_osds
, 'out': out_osds
, 'up': up_osds
,
1581 'down': down_osds
, 'dead': dead_osds
, 'live': live_osds
,
1584 def get_num_pgs(self
):
1586 Check cluster status for the number of pgs
1588 status
= self
.raw_cluster_status()
1590 return status
['pgmap']['num_pgs']
1592 def create_erasure_code_profile(self
, profile_name
, profile
):
1594 Create an erasure code profile name that can be used as a parameter
1595 when creating an erasure coded pool.
1598 args
= cmd_erasure_code_profile(profile_name
, profile
)
1599 self
.raw_cluster_cmd(*args
)
1601 def create_pool_with_unique_name(self
, pg_num
=16,
1602 erasure_code_profile_name
=None,
1604 erasure_code_use_overwrites
=False):
1606 Create a pool named unique_pool_X where X is unique.
1610 name
= "unique_pool_%s" % (str(self
.next_pool_id
),)
1611 self
.next_pool_id
+= 1
1615 erasure_code_profile_name
=erasure_code_profile_name
,
1617 erasure_code_use_overwrites
=erasure_code_use_overwrites
)
1620 @contextlib.contextmanager
1621 def pool(self
, pool_name
, pg_num
=16, erasure_code_profile_name
=None):
1622 self
.create_pool(pool_name
, pg_num
, erasure_code_profile_name
)
1624 self
.remove_pool(pool_name
)
1626 def create_pool(self
, pool_name
, pg_num
=16,
1627 erasure_code_profile_name
=None,
1629 erasure_code_use_overwrites
=False):
1631 Create a pool named from the pool_name parameter.
1632 :param pool_name: name of the pool being created.
1633 :param pg_num: initial number of pgs.
1634 :param erasure_code_profile_name: if set and !None create an
1635 erasure coded pool using the profile
1636 :param erasure_code_use_overwrites: if true, allow overwrites
1639 assert isinstance(pool_name
, basestring
)
1640 assert isinstance(pg_num
, int)
1641 assert pool_name
not in self
.pools
1642 self
.log("creating pool_name %s" % (pool_name
,))
1643 if erasure_code_profile_name
:
1644 self
.raw_cluster_cmd('osd', 'pool', 'create',
1645 pool_name
, str(pg_num
), str(pg_num
),
1646 'erasure', erasure_code_profile_name
)
1648 self
.raw_cluster_cmd('osd', 'pool', 'create',
1649 pool_name
, str(pg_num
))
1650 if min_size
is not None:
1651 self
.raw_cluster_cmd(
1652 'osd', 'pool', 'set', pool_name
,
1655 if erasure_code_use_overwrites
:
1656 self
.raw_cluster_cmd(
1657 'osd', 'pool', 'set', pool_name
,
1658 'allow_ec_overwrites',
1660 self
.raw_cluster_cmd(
1661 'osd', 'pool', 'application', 'enable',
1662 pool_name
, 'rados', '--yes-i-really-mean-it',
1663 run
.Raw('||'), 'true')
1664 self
.pools
[pool_name
] = pg_num
1667 def add_pool_snap(self
, pool_name
, snap_name
):
1670 :param pool_name: name of pool to snapshot
1671 :param snap_name: name of snapshot to take
1673 self
.raw_cluster_cmd('osd', 'pool', 'mksnap',
1674 str(pool_name
), str(snap_name
))
1676 def remove_pool_snap(self
, pool_name
, snap_name
):
1678 Remove pool snapshot
1679 :param pool_name: name of pool to snapshot
1680 :param snap_name: name of snapshot to remove
1682 self
.raw_cluster_cmd('osd', 'pool', 'rmsnap',
1683 str(pool_name
), str(snap_name
))
1685 def remove_pool(self
, pool_name
):
1687 Remove the indicated pool
1688 :param pool_name: Pool to be removed
1691 assert isinstance(pool_name
, basestring
)
1692 assert pool_name
in self
.pools
1693 self
.log("removing pool_name %s" % (pool_name
,))
1694 del self
.pools
[pool_name
]
1695 self
.do_rados(self
.controller
,
1696 ['rmpool', pool_name
, pool_name
,
1697 "--yes-i-really-really-mean-it"])
1704 return random
.choice(self
.pools
.keys())
1706 def get_pool_pg_num(self
, pool_name
):
1708 Return the number of pgs in the pool specified.
1711 assert isinstance(pool_name
, basestring
)
1712 if pool_name
in self
.pools
:
1713 return self
.pools
[pool_name
]
1716 def get_pool_property(self
, pool_name
, prop
):
1718 :param pool_name: pool
1719 :param prop: property to be checked.
1720 :returns: property as an int value.
1723 assert isinstance(pool_name
, basestring
)
1724 assert isinstance(prop
, basestring
)
1725 output
= self
.raw_cluster_cmd(
1731 return int(output
.split()[1])
1733 def set_pool_property(self
, pool_name
, prop
, val
):
1735 :param pool_name: pool
1736 :param prop: property to be set.
1737 :param val: value to set.
1739 This routine retries if set operation fails.
1742 assert isinstance(pool_name
, basestring
)
1743 assert isinstance(prop
, basestring
)
1744 assert isinstance(val
, int)
1747 r
= self
.raw_cluster_cmd_result(
1754 if r
!= 11: # EAGAIN
1758 raise Exception('timed out getting EAGAIN '
1759 'when setting pool property %s %s = %s' %
1760 (pool_name
, prop
, val
))
1761 self
.log('got EAGAIN setting pool property, '
1762 'waiting a few seconds...')
1765 def expand_pool(self
, pool_name
, by
, max_pgs
):
1767 Increase the number of pgs in a pool
1770 assert isinstance(pool_name
, basestring
)
1771 assert isinstance(by
, int)
1772 assert pool_name
in self
.pools
1773 if self
.get_num_creating() > 0:
1775 if (self
.pools
[pool_name
] + by
) > max_pgs
:
1777 self
.log("increase pool size by %d" % (by
,))
1778 new_pg_num
= self
.pools
[pool_name
] + by
1779 self
.set_pool_property(pool_name
, "pg_num", new_pg_num
)
1780 self
.pools
[pool_name
] = new_pg_num
1783 def set_pool_pgpnum(self
, pool_name
, force
):
1785 Set pgpnum property of pool_name pool.
1788 assert isinstance(pool_name
, basestring
)
1789 assert pool_name
in self
.pools
1790 if not force
and self
.get_num_creating() > 0:
1792 self
.set_pool_property(pool_name
, 'pgp_num', self
.pools
[pool_name
])
1795 def list_pg_missing(self
, pgid
):
1797 return list of missing pgs with the id specified
1802 out
= self
.raw_cluster_cmd('--', 'pg', pgid
, 'list_missing',
1808 r
['objects'].extend(j
['objects'])
1813 offset
= j
['objects'][-1]['oid']
1818 def get_pg_stats(self
):
1820 Dump the cluster and get pg stats
1822 out
= self
.raw_cluster_cmd('pg', 'dump', '--format=json')
1823 j
= json
.loads('\n'.join(out
.split('\n')[1:]))
1824 return j
['pg_stats']
1826 def get_pgids_to_force(self
, backfill
):
1828 Return the randomized list of PGs that can have their recovery/backfill forced
1830 j
= self
.get_pg_stats();
1833 wanted
= ['degraded', 'backfilling', 'backfill_wait']
1835 wanted
= ['recovering', 'degraded', 'recovery_wait']
1837 status
= pg
['state'].split('+')
1839 if random
.random() > 0.5 and not ('forced_backfill' in status
or 'forced_recovery' in status
) and t
in status
:
1840 pgids
.append(pg
['pgid'])
1844 def get_pgids_to_cancel_force(self
, backfill
):
1846 Return the randomized list of PGs whose recovery/backfill priority is forced
1848 j
= self
.get_pg_stats();
1851 wanted
= 'forced_backfill'
1853 wanted
= 'forced_recovery'
1855 status
= pg
['state'].split('+')
1856 if wanted
in status
and random
.random() > 0.5:
1857 pgids
.append(pg
['pgid'])
1860 def compile_pg_status(self
):
1862 Return a histogram of pg state values
1865 j
= self
.get_pg_stats()
1867 for status
in pg
['state'].split('+'):
1868 if status
not in ret
:
1874 def with_pg_state(self
, pool
, pgnum
, check
):
1875 pgstr
= self
.get_pgid(pool
, pgnum
)
1876 stats
= self
.get_single_pg_stats(pgstr
)
1877 assert(check(stats
['state']))
1880 def with_pg(self
, pool
, pgnum
, check
):
1881 pgstr
= self
.get_pgid(pool
, pgnum
)
1882 stats
= self
.get_single_pg_stats(pgstr
)
1885 def get_last_scrub_stamp(self
, pool
, pgnum
):
1887 Get the timestamp of the last scrub.
1889 stats
= self
.get_single_pg_stats(self
.get_pgid(pool
, pgnum
))
1890 return stats
["last_scrub_stamp"]
1892 def do_pg_scrub(self
, pool
, pgnum
, stype
):
1894 Scrub pg and wait for scrubbing to finish
1896 init
= self
.get_last_scrub_stamp(pool
, pgnum
)
1897 RESEND_TIMEOUT
= 120 # Must be a multiple of SLEEP_TIME
1898 FATAL_TIMEOUT
= RESEND_TIMEOUT
* 3
1901 while init
== self
.get_last_scrub_stamp(pool
, pgnum
):
1902 assert timer
< FATAL_TIMEOUT
, "fatal timeout trying to " + stype
1903 self
.log("waiting for scrub type %s" % (stype
,))
1904 if (timer
% RESEND_TIMEOUT
) == 0:
1905 self
.raw_cluster_cmd('pg', stype
, self
.get_pgid(pool
, pgnum
))
1906 # The first time in this loop is the actual request
1907 if timer
!= 0 and stype
== "repair":
1908 self
.log("WARNING: Resubmitted a non-idempotent repair")
1909 time
.sleep(SLEEP_TIME
)
1912 def wait_snap_trimming_complete(self
, pool
):
1914 Wait for snap trimming on pool to end
1919 poolnum
= self
.get_pool_num(pool
)
1920 poolnumstr
= "%s." % (poolnum
,)
1923 if (now
- start
) > FATAL_TIMEOUT
:
1924 assert (now
- start
) < FATAL_TIMEOUT
, \
1925 'failed to complete snap trimming before timeout'
1926 all_stats
= self
.get_pg_stats()
1928 for pg
in all_stats
:
1929 if (poolnumstr
in pg
['pgid']) and ('snaptrim' in pg
['state']):
1930 self
.log("pg {pg} in trimming, state: {state}".format(
1936 self
.log("{pool} still trimming, waiting".format(pool
=pool
))
1937 time
.sleep(POLL_PERIOD
)
1939 def get_single_pg_stats(self
, pgid
):
1941 Return pg for the pgid specified.
1943 all_stats
= self
.get_pg_stats()
1945 for pg
in all_stats
:
1946 if pg
['pgid'] == pgid
:
1951 def get_object_pg_with_shard(self
, pool
, name
, osdid
):
1954 pool_dump
= self
.get_pool_dump(pool
)
1955 object_map
= self
.get_object_map(pool
, name
)
1956 if pool_dump
["type"] == CephManager
.ERASURE_CODED_POOL
:
1957 shard
= object_map
['acting'].index(osdid
)
1958 return "{pgid}s{shard}".format(pgid
=object_map
['pgid'],
1961 return object_map
['pgid']
1963 def get_object_primary(self
, pool
, name
):
1966 object_map
= self
.get_object_map(pool
, name
)
1967 return object_map
['acting_primary']
1969 def get_object_map(self
, pool
, name
):
1971 osd map --format=json converted to a python object
1972 :returns: the python object
1974 out
= self
.raw_cluster_cmd('--format=json', 'osd', 'map', pool
, name
)
1975 return json
.loads('\n'.join(out
.split('\n')[1:]))
1977 def get_osd_dump_json(self
):
1979 osd dump --format=json converted to a python object
1980 :returns: the python object
1982 out
= self
.raw_cluster_cmd('osd', 'dump', '--format=json')
1983 return json
.loads('\n'.join(out
.split('\n')[1:]))
1985 def get_osd_dump(self
):
1990 return self
.get_osd_dump_json()['osds']
1992 def get_mgr_dump(self
):
1993 out
= self
.raw_cluster_cmd('mgr', 'dump', '--format=json')
1994 return json
.loads(out
)
1996 def get_stuck_pgs(self
, type_
, threshold
):
1998 :returns: stuck pg information from the cluster
2000 out
= self
.raw_cluster_cmd('pg', 'dump_stuck', type_
, str(threshold
),
2002 return json
.loads(out
)
2004 def get_num_unfound_objects(self
):
2006 Check cluster status to get the number of unfound objects
2008 status
= self
.raw_cluster_status()
2010 return status
['pgmap'].get('unfound_objects', 0)
2012 def get_num_creating(self
):
2014 Find the number of pgs in creating mode.
2016 pgs
= self
.get_pg_stats()
2019 if 'creating' in pg
['state']:
2023 def get_num_active_clean(self
):
2025 Find the number of active and clean pgs.
2027 pgs
= self
.get_pg_stats()
2030 if (pg
['state'].count('active') and
2031 pg
['state'].count('clean') and
2032 not pg
['state'].count('stale')):
2036 def get_num_active_recovered(self
):
2038 Find the number of active and recovered pgs.
2040 pgs
= self
.get_pg_stats()
2043 if (pg
['state'].count('active') and
2044 not pg
['state'].count('recover') and
2045 not pg
['state'].count('backfill') and
2046 not pg
['state'].count('stale')):
2050 def get_is_making_recovery_progress(self
):
2052 Return whether there is recovery progress discernable in the
2055 status
= self
.raw_cluster_status()
2056 kps
= status
['pgmap'].get('recovering_keys_per_sec', 0)
2057 bps
= status
['pgmap'].get('recovering_bytes_per_sec', 0)
2058 ops
= status
['pgmap'].get('recovering_objects_per_sec', 0)
2059 return kps
> 0 or bps
> 0 or ops
> 0
2061 def get_num_active(self
):
2063 Find the number of active pgs.
2065 pgs
= self
.get_pg_stats()
2068 if pg
['state'].count('active') and not pg
['state'].count('stale'):
2072 def get_num_down(self
):
2074 Find the number of pgs that are down.
2076 pgs
= self
.get_pg_stats()
2079 if ((pg
['state'].count('down') and not
2080 pg
['state'].count('stale')) or
2081 (pg
['state'].count('incomplete') and not
2082 pg
['state'].count('stale'))):
2086 def get_num_active_down(self
):
2088 Find the number of pgs that are either active or down.
2090 pgs
= self
.get_pg_stats()
2093 if ((pg
['state'].count('active') and not
2094 pg
['state'].count('stale')) or
2095 (pg
['state'].count('down') and not
2096 pg
['state'].count('stale')) or
2097 (pg
['state'].count('incomplete') and not
2098 pg
['state'].count('stale'))):
2104 True if all pgs are clean
2106 return self
.get_num_active_clean() == self
.get_num_pgs()
2108 def is_recovered(self
):
2110 True if all pgs have recovered
2112 return self
.get_num_active_recovered() == self
.get_num_pgs()
2114 def is_active_or_down(self
):
2116 True if all pgs are active or down
2118 return self
.get_num_active_down() == self
.get_num_pgs()
2120 def wait_for_clean(self
, timeout
=None):
2122 Returns true when all pgs are clean.
2124 self
.log("waiting for clean")
2126 num_active_clean
= self
.get_num_active_clean()
2127 while not self
.is_clean():
2128 if timeout
is not None:
2129 if self
.get_is_making_recovery_progress():
2130 self
.log("making progress, resetting timeout")
2133 self
.log("no progress seen, keeping timeout for now")
2134 if time
.time() - start
>= timeout
:
2135 self
.log('dumping pgs')
2136 out
= self
.raw_cluster_cmd('pg', 'dump')
2138 assert time
.time() - start
< timeout
, \
2139 'failed to become clean before timeout expired'
2140 cur_active_clean
= self
.get_num_active_clean()
2141 if cur_active_clean
!= num_active_clean
:
2143 num_active_clean
= cur_active_clean
2147 def are_all_osds_up(self
):
2149 Returns true if all osds are up.
2151 x
= self
.get_osd_dump()
2152 return (len(x
) == sum([(y
['up'] > 0) for y
in x
]))
2154 def wait_for_all_osds_up(self
, timeout
=None):
2156 When this exits, either the timeout has expired, or all
2159 self
.log("waiting for all up")
2161 while not self
.are_all_osds_up():
2162 if timeout
is not None:
2163 assert time
.time() - start
< timeout
, \
2164 'timeout expired in wait_for_all_osds_up'
2168 def pool_exists(self
, pool
):
2169 if pool
in self
.list_pools():
2173 def wait_for_pool(self
, pool
, timeout
=300):
2175 Wait for a pool to exist
2177 self
.log('waiting for pool %s to exist' % pool
)
2179 while not self
.pool_exists(pool
):
2180 if timeout
is not None:
2181 assert time
.time() - start
< timeout
, \
2182 'timeout expired in wait_for_pool'
2185 def wait_for_pools(self
, pools
):
2187 self
.wait_for_pool(pool
)
2189 def is_mgr_available(self
):
2190 x
= self
.get_mgr_dump()
2191 return x
.get('available', False)
2193 def wait_for_mgr_available(self
, timeout
=None):
2194 self
.log("waiting for mgr available")
2196 while not self
.is_mgr_available():
2197 if timeout
is not None:
2198 assert time
.time() - start
< timeout
, \
2199 'timeout expired in wait_for_mgr_available'
2201 self
.log("mgr available!")
2203 def wait_for_recovery(self
, timeout
=None):
2205 Check peering. When this exists, we have recovered.
2207 self
.log("waiting for recovery to complete")
2209 num_active_recovered
= self
.get_num_active_recovered()
2210 while not self
.is_recovered():
2212 if timeout
is not None:
2213 if self
.get_is_making_recovery_progress():
2214 self
.log("making progress, resetting timeout")
2217 self
.log("no progress seen, keeping timeout for now")
2218 if now
- start
>= timeout
:
2219 self
.log('dumping pgs')
2220 out
= self
.raw_cluster_cmd('pg', 'dump')
2222 assert now
- start
< timeout
, \
2223 'failed to recover before timeout expired'
2224 cur_active_recovered
= self
.get_num_active_recovered()
2225 if cur_active_recovered
!= num_active_recovered
:
2227 num_active_recovered
= cur_active_recovered
2229 self
.log("recovered!")
2231 def wait_for_active(self
, timeout
=None):
2233 Check peering. When this exists, we are definitely active
2235 self
.log("waiting for peering to complete")
2237 num_active
= self
.get_num_active()
2238 while not self
.is_active():
2239 if timeout
is not None:
2240 if time
.time() - start
>= timeout
:
2241 self
.log('dumping pgs')
2242 out
= self
.raw_cluster_cmd('pg', 'dump')
2244 assert time
.time() - start
< timeout
, \
2245 'failed to recover before timeout expired'
2246 cur_active
= self
.get_num_active()
2247 if cur_active
!= num_active
:
2249 num_active
= cur_active
2253 def wait_for_active_or_down(self
, timeout
=None):
2255 Check peering. When this exists, we are definitely either
2258 self
.log("waiting for peering to complete or become blocked")
2260 num_active_down
= self
.get_num_active_down()
2261 while not self
.is_active_or_down():
2262 if timeout
is not None:
2263 if time
.time() - start
>= timeout
:
2264 self
.log('dumping pgs')
2265 out
= self
.raw_cluster_cmd('pg', 'dump')
2267 assert time
.time() - start
< timeout
, \
2268 'failed to recover before timeout expired'
2269 cur_active_down
= self
.get_num_active_down()
2270 if cur_active_down
!= num_active_down
:
2272 num_active_down
= cur_active_down
2274 self
.log("active or down!")
2276 def osd_is_up(self
, osd
):
2278 Wrapper for osd check
2280 osds
= self
.get_osd_dump()
2281 return osds
[osd
]['up'] > 0
2283 def wait_till_osd_is_up(self
, osd
, timeout
=None):
2285 Loop waiting for osd.
2287 self
.log('waiting for osd.%d to be up' % osd
)
2289 while not self
.osd_is_up(osd
):
2290 if timeout
is not None:
2291 assert time
.time() - start
< timeout
, \
2292 'osd.%d failed to come up before timeout expired' % osd
2294 self
.log('osd.%d is up' % osd
)
2296 def is_active(self
):
2298 Wrapper to check if all pgs are active
2300 return self
.get_num_active() == self
.get_num_pgs()
2302 def wait_till_active(self
, timeout
=None):
2304 Wait until all pgs are active.
2306 self
.log("waiting till active")
2308 while not self
.is_active():
2309 if timeout
is not None:
2310 if time
.time() - start
>= timeout
:
2311 self
.log('dumping pgs')
2312 out
= self
.raw_cluster_cmd('pg', 'dump')
2314 assert time
.time() - start
< timeout
, \
2315 'failed to become active before timeout expired'
2319 def mark_out_osd(self
, osd
):
2321 Wrapper to mark osd out.
2323 self
.raw_cluster_cmd('osd', 'out', str(osd
))
2325 def kill_osd(self
, osd
):
2327 Kill osds by either power cycling (if indicated by the config)
2330 if self
.config
.get('powercycle'):
2331 remote
= self
.find_remote('osd', osd
)
2332 self
.log('kill_osd on osd.{o} '
2333 'doing powercycle of {s}'.format(o
=osd
, s
=remote
.name
))
2334 self
._assert
_ipmi
(remote
)
2335 remote
.console
.power_off()
2336 elif self
.config
.get('bdev_inject_crash') and self
.config
.get('bdev_inject_crash_probability'):
2337 if random
.uniform(0, 1) < self
.config
.get('bdev_inject_crash_probability', .5):
2338 self
.raw_cluster_cmd(
2339 '--', 'tell', 'osd.%d' % osd
,
2341 '--bdev-inject-crash %d' % self
.config
.get('bdev_inject_crash'),
2344 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).wait()
2348 raise RuntimeError('osd.%s did not fail' % osd
)
2350 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2352 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2355 def _assert_ipmi(remote
):
2356 assert remote
.console
.has_ipmi_credentials
, (
2357 "powercycling requested but RemoteConsole is not "
2358 "initialized. Check ipmi config.")
2360 def blackhole_kill_osd(self
, osd
):
2362 Stop osd if nothing else works.
2364 self
.raw_cluster_cmd('--', 'tell', 'osd.%d' % osd
,
2366 '--objectstore-blackhole')
2368 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2370 def revive_osd(self
, osd
, timeout
=150, skip_admin_check
=False):
2372 Revive osds by either power cycling (if indicated by the config)
2375 if self
.config
.get('powercycle'):
2376 remote
= self
.find_remote('osd', osd
)
2377 self
.log('kill_osd on osd.{o} doing powercycle of {s}'.
2378 format(o
=osd
, s
=remote
.name
))
2379 self
._assert
_ipmi
(remote
)
2380 remote
.console
.power_on()
2381 if not remote
.console
.check_status(300):
2382 raise Exception('Failed to revive osd.{o} via ipmi'.
2384 teuthology
.reconnect(self
.ctx
, 60, [remote
])
2385 mount_osd_data(self
.ctx
, remote
, self
.cluster
, str(osd
))
2386 self
.make_admin_daemon_dir(remote
)
2387 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).reset()
2388 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).restart()
2390 if not skip_admin_check
:
2391 # wait for dump_ops_in_flight; this command doesn't appear
2392 # until after the signal handler is installed and it is safe
2393 # to stop the osd again without making valgrind leak checks
2394 # unhappy. see #5924.
2395 self
.wait_run_admin_socket('osd', osd
,
2396 args
=['dump_ops_in_flight'],
2397 timeout
=timeout
, stdout
=DEVNULL
)
2399 def mark_down_osd(self
, osd
):
2401 Cluster command wrapper
2403 self
.raw_cluster_cmd('osd', 'down', str(osd
))
2405 def mark_in_osd(self
, osd
):
2407 Cluster command wrapper
2409 self
.raw_cluster_cmd('osd', 'in', str(osd
))
2411 def signal_osd(self
, osd
, sig
, silent
=False):
2413 Wrapper to local get_daemon call which sends the given
2414 signal to the given osd.
2416 self
.ctx
.daemons
.get_daemon('osd', osd
,
2417 self
.cluster
).signal(sig
, silent
=silent
)
2420 def signal_mon(self
, mon
, sig
, silent
=False):
2422 Wrapper to local get_deamon call
2424 self
.ctx
.daemons
.get_daemon('mon', mon
,
2425 self
.cluster
).signal(sig
, silent
=silent
)
2427 def kill_mon(self
, mon
):
2429 Kill the monitor by either power cycling (if the config says so),
2432 if self
.config
.get('powercycle'):
2433 remote
= self
.find_remote('mon', mon
)
2434 self
.log('kill_mon on mon.{m} doing powercycle of {s}'.
2435 format(m
=mon
, s
=remote
.name
))
2436 self
._assert
_ipmi
(remote
)
2437 remote
.console
.power_off()
2439 self
.ctx
.daemons
.get_daemon('mon', mon
, self
.cluster
).stop()
2441 def revive_mon(self
, mon
):
2443 Restart by either power cycling (if the config says so),
2444 or by doing a normal restart.
2446 if self
.config
.get('powercycle'):
2447 remote
= self
.find_remote('mon', mon
)
2448 self
.log('revive_mon on mon.{m} doing powercycle of {s}'.
2449 format(m
=mon
, s
=remote
.name
))
2450 self
._assert
_ipmi
(remote
)
2451 remote
.console
.power_on()
2452 self
.make_admin_daemon_dir(remote
)
2453 self
.ctx
.daemons
.get_daemon('mon', mon
, self
.cluster
).restart()
2455 def revive_mgr(self
, mgr
):
2457 Restart by either power cycling (if the config says so),
2458 or by doing a normal restart.
2460 if self
.config
.get('powercycle'):
2461 remote
= self
.find_remote('mgr', mgr
)
2462 self
.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
2463 format(m
=mgr
, s
=remote
.name
))
2464 self
._assert
_ipmi
(remote
)
2465 remote
.console
.power_on()
2466 self
.make_admin_daemon_dir(remote
)
2467 self
.ctx
.daemons
.get_daemon('mgr', mgr
, self
.cluster
).restart()
2469 def get_mon_status(self
, mon
):
2471 Extract all the monitor status information from the cluster
2473 addr
= self
.ctx
.ceph
[self
.cluster
].conf
['mon.%s' % mon
]['mon addr']
2474 out
= self
.raw_cluster_cmd('-m', addr
, 'mon_status')
2475 return json
.loads(out
)
2477 def get_mon_quorum(self
):
2479 Extract monitor quorum information from the cluster
2481 out
= self
.raw_cluster_cmd('quorum_status')
2483 self
.log('quorum_status is %s' % out
)
2486 def wait_for_mon_quorum_size(self
, size
, timeout
=300):
2488 Loop until quorum size is reached.
2490 self
.log('waiting for quorum size %d' % size
)
2492 while not len(self
.get_mon_quorum()) == size
:
2493 if timeout
is not None:
2494 assert time
.time() - start
< timeout
, \
2495 ('failed to reach quorum size %d '
2496 'before timeout expired' % size
)
2498 self
.log("quorum is size %d" % size
)
2500 def get_mon_health(self
, debug
=False):
2502 Extract all the monitor health information.
2504 out
= self
.raw_cluster_cmd('health', '--format=json')
2506 self
.log('health:\n{h}'.format(h
=out
))
2507 return json
.loads(out
)
2509 def get_mds_status(self
, mds
):
2511 Run cluster commands for the mds in order to get mds information
2513 out
= self
.raw_cluster_cmd('mds', 'dump', '--format=json')
2514 j
= json
.loads(' '.join(out
.splitlines()[1:]))
2515 # collate; for dup ids, larger gid wins.
2516 for info
in j
['info'].itervalues():
2517 if info
['name'] == mds
:
2521 def get_filepath(self
):
2523 Return path to osd data with {id} needing to be replaced
2525 return '/var/lib/ceph/osd/' + self
.cluster
+ '-{id}'
2527 def make_admin_daemon_dir(self
, remote
):
2529 Create /var/run/ceph directory on remote site.
2532 :param remote: Remote site
2534 remote
.run(args
=['sudo',
2535 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
2538 def utility_task(name
):
2540 Generate ceph_manager subtask corresponding to ceph_manager
2543 def task(ctx
, config
):
2546 args
= config
.get('args', [])
2547 kwargs
= config
.get('kwargs', {})
2548 cluster
= config
.get('cluster', 'ceph')
2549 fn
= getattr(ctx
.managers
[cluster
], name
)
2553 revive_osd
= utility_task("revive_osd")
2554 revive_mon
= utility_task("revive_mon")
2555 kill_osd
= utility_task("kill_osd")
2556 kill_mon
= utility_task("kill_mon")
2557 create_pool
= utility_task("create_pool")
2558 remove_pool
= utility_task("remove_pool")
2559 wait_for_clean
= utility_task("wait_for_clean")
2560 flush_all_pg_stats
= utility_task("flush_all_pg_stats")
2561 set_pool_property
= utility_task("set_pool_property")
2562 do_pg_scrub
= utility_task("do_pg_scrub")
2563 wait_for_pool
= utility_task("wait_for_pool")
2564 wait_for_pools
= utility_task("wait_for_pools")