2 ceph manager -- Thrasher and CephManager objects
4 from cStringIO
import StringIO
5 from functools
import wraps
17 from teuthology
import misc
as teuthology
18 from tasks
.scrub
import Scrubber
19 from util
.rados
import cmd_erasure_code_profile
20 from util
import get_remote
21 from teuthology
.contextutil
import safe_while
22 from teuthology
.orchestra
.remote
import Remote
23 from teuthology
.orchestra
import run
24 from teuthology
.exceptions
import CommandFailedError
27 from subprocess
import DEVNULL
# py3k
29 DEVNULL
= open(os
.devnull
, 'r+')
31 DEFAULT_CONF_PATH
= '/etc/ceph/ceph.conf'
33 log
= logging
.getLogger(__name__
)
36 def write_conf(ctx
, conf_path
=DEFAULT_CONF_PATH
, cluster
='ceph'):
38 ctx
.ceph
[cluster
].conf
.write(conf_fp
)
40 writes
= ctx
.cluster
.run(
42 'sudo', 'mkdir', '-p', '/etc/ceph', run
.Raw('&&'),
43 'sudo', 'chmod', '0755', '/etc/ceph', run
.Raw('&&'),
46 ('import shutil, sys; '
47 'shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))'),
50 'sudo', 'chmod', '0644', conf_path
,
54 teuthology
.feed_many_stdins_and_close(conf_fp
, writes
)
58 def mount_osd_data(ctx
, remote
, cluster
, osd
):
63 :param remote: Remote site
64 :param cluster: name of ceph cluster
67 log
.debug('Mounting data for osd.{o} on {r}'.format(o
=osd
, r
=remote
))
68 role
= "{0}.osd.{1}".format(cluster
, osd
)
69 alt_role
= role
if cluster
!= 'ceph' else "osd.{0}".format(osd
)
70 if remote
in ctx
.disk_config
.remote_to_roles_to_dev
:
71 if alt_role
in ctx
.disk_config
.remote_to_roles_to_dev
[remote
]:
73 if role
not in ctx
.disk_config
.remote_to_roles_to_dev
[remote
]:
75 dev
= ctx
.disk_config
.remote_to_roles_to_dev
[remote
][role
]
76 mount_options
= ctx
.disk_config
.\
77 remote_to_roles_to_dev_mount_options
[remote
][role
]
78 fstype
= ctx
.disk_config
.remote_to_roles_to_dev_fstype
[remote
][role
]
79 mnt
= os
.path
.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster
, osd
))
81 log
.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
82 'mountpoint: {p}, type: {t}, options: {v}'.format(
83 o
=osd
, n
=remote
.name
, p
=mnt
, t
=fstype
, v
=mount_options
,
91 '-o', ','.join(mount_options
),
100 Object used to thrash Ceph
102 def __init__(self
, manager
, config
, logger
=None):
103 self
.ceph_manager
= manager
104 self
.cluster
= manager
.cluster
105 self
.ceph_manager
.wait_for_clean()
106 osd_status
= self
.ceph_manager
.get_osd_status()
107 self
.in_osds
= osd_status
['in']
108 self
.live_osds
= osd_status
['live']
109 self
.out_osds
= osd_status
['out']
110 self
.dead_osds
= osd_status
['dead']
111 self
.stopping
= False
114 self
.revive_timeout
= self
.config
.get("revive_timeout", 360)
115 self
.pools_to_fix_pgp_num
= set()
116 if self
.config
.get('powercycle'):
117 self
.revive_timeout
+= 120
118 self
.clean_wait
= self
.config
.get('clean_wait', 0)
119 self
.minin
= self
.config
.get("min_in", 4)
120 self
.chance_move_pg
= self
.config
.get('chance_move_pg', 1.0)
121 self
.sighup_delay
= self
.config
.get('sighup_delay')
122 self
.optrack_toggle_delay
= self
.config
.get('optrack_toggle_delay')
123 self
.dump_ops_enable
= self
.config
.get('dump_ops_enable')
124 self
.noscrub_toggle_delay
= self
.config
.get('noscrub_toggle_delay')
125 self
.chance_thrash_cluster_full
= self
.config
.get('chance_thrash_cluster_full', .05)
126 self
.chance_thrash_pg_upmap
= self
.config
.get('chance_thrash_pg_upmap', 1.0)
127 self
.chance_thrash_pg_upmap_items
= self
.config
.get('chance_thrash_pg_upmap', 1.0)
128 self
.random_eio
= self
.config
.get('random_eio')
129 self
.chance_force_recovery
= self
.config
.get('chance_force_recovery', 0.3)
131 num_osds
= self
.in_osds
+ self
.out_osds
132 self
.max_pgs
= self
.config
.get("max_pgs_per_pool_osd", 1200) * num_osds
133 if self
.logger
is not None:
134 self
.log
= lambda x
: self
.logger
.info(x
)
138 Implement log behavior
142 if self
.config
is None:
144 # prevent monitor from auto-marking things out while thrasher runs
145 # try both old and new tell syntax, in case we are testing old code
146 self
.saved_options
= []
147 # assuming that the default settings do not vary from one daemon to
149 first_mon
= teuthology
.get_first_mon(manager
.ctx
, self
.config
).split('.')
150 opts
= [('mon', 'mon_osd_down_out_interval', 0)]
151 for service
, opt
, new_value
in opts
:
152 old_value
= manager
.get_config(first_mon
[0],
155 self
.saved_options
.append((service
, opt
, old_value
))
156 self
._set
_config
(service
, '*', opt
, new_value
)
157 # initialize ceph_objectstore_tool property - must be done before
158 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
159 if (self
.config
.get('powercycle') or
160 not self
.cmd_exists_on_osds("ceph-objectstore-tool") or
161 self
.config
.get('disable_objectstore_tool_tests', False)):
162 self
.ceph_objectstore_tool
= False
163 self
.test_rm_past_intervals
= False
164 if self
.config
.get('powercycle'):
165 self
.log("Unable to test ceph-objectstore-tool, "
166 "powercycle testing")
168 self
.log("Unable to test ceph-objectstore-tool, "
169 "not available on all OSD nodes")
171 self
.ceph_objectstore_tool
= \
172 self
.config
.get('ceph_objectstore_tool', True)
173 self
.test_rm_past_intervals
= \
174 self
.config
.get('test_rm_past_intervals', True)
176 self
.thread
= gevent
.spawn(self
.do_thrash
)
177 if self
.sighup_delay
:
178 self
.sighup_thread
= gevent
.spawn(self
.do_sighup
)
179 if self
.optrack_toggle_delay
:
180 self
.optrack_toggle_thread
= gevent
.spawn(self
.do_optrack_toggle
)
181 if self
.dump_ops_enable
== "true":
182 self
.dump_ops_thread
= gevent
.spawn(self
.do_dump_ops
)
183 if self
.noscrub_toggle_delay
:
184 self
.noscrub_toggle_thread
= gevent
.spawn(self
.do_noscrub_toggle
)
186 def _set_config(self
, service_type
, service_id
, name
, value
):
187 opt_arg
= '--{name} {value}'.format(name
=name
, value
=value
)
188 whom
= '.'.join([service_type
, service_id
])
189 self
.ceph_manager
.raw_cluster_cmd('--', 'tell', whom
,
190 'injectargs', opt_arg
)
193 def cmd_exists_on_osds(self
, cmd
):
194 allremotes
= self
.ceph_manager
.ctx
.cluster
.only(\
195 teuthology
.is_type('osd', self
.cluster
)).remotes
.keys()
196 allremotes
= list(set(allremotes
))
197 for remote
in allremotes
:
198 proc
= remote
.run(args
=['type', cmd
], wait
=True,
199 check_status
=False, stdout
=StringIO(),
201 if proc
.exitstatus
!= 0:
205 def kill_osd(self
, osd
=None, mark_down
=False, mark_out
=False):
207 :param osd: Osd to be killed.
208 :mark_down: Mark down if true.
209 :mark_out: Mark out if true.
212 osd
= random
.choice(self
.live_osds
)
213 self
.log("Killing osd %s, live_osds are %s" % (str(osd
),
214 str(self
.live_osds
)))
215 self
.live_osds
.remove(osd
)
216 self
.dead_osds
.append(osd
)
217 self
.ceph_manager
.kill_osd(osd
)
219 self
.ceph_manager
.mark_down_osd(osd
)
220 if mark_out
and osd
in self
.in_osds
:
222 if self
.ceph_objectstore_tool
:
223 self
.log("Testing ceph-objectstore-tool on down osd")
224 remote
= self
.ceph_manager
.find_remote('osd', osd
)
225 FSPATH
= self
.ceph_manager
.get_filepath()
226 JPATH
= os
.path
.join(FSPATH
, "journal")
227 exp_osd
= imp_osd
= osd
228 exp_remote
= imp_remote
= remote
229 # If an older osd is available we'll move a pg from there
230 if (len(self
.dead_osds
) > 1 and
231 random
.random() < self
.chance_move_pg
):
232 exp_osd
= random
.choice(self
.dead_osds
[:-1])
233 exp_remote
= self
.ceph_manager
.find_remote('osd', exp_osd
)
234 if ('keyvaluestore_backend' in
235 self
.ceph_manager
.ctx
.ceph
[self
.cluster
].conf
['osd']):
236 prefix
= ("sudo adjust-ulimits ceph-objectstore-tool "
237 "--data-path {fpath} --journal-path {jpath} "
238 "--type keyvaluestore "
240 "/var/log/ceph/objectstore_tool.\\$pid.log ".
241 format(fpath
=FSPATH
, jpath
=JPATH
))
243 prefix
= ("sudo adjust-ulimits ceph-objectstore-tool "
244 "--data-path {fpath} --journal-path {jpath} "
246 "/var/log/ceph/objectstore_tool.\\$pid.log ".
247 format(fpath
=FSPATH
, jpath
=JPATH
))
248 cmd
= (prefix
+ "--op list-pgs").format(id=exp_osd
)
250 # ceph-objectstore-tool might be temporarily absent during an
251 # upgrade - see http://tracker.ceph.com/issues/18014
252 with
safe_while(sleep
=15, tries
=40, action
="type ceph-objectstore-tool") as proceed
:
254 proc
= exp_remote
.run(args
=['type', 'ceph-objectstore-tool'],
255 wait
=True, check_status
=False, stdout
=StringIO(),
257 if proc
.exitstatus
== 0:
259 log
.debug("ceph-objectstore-tool binary not present, trying again")
261 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
262 # see http://tracker.ceph.com/issues/19556
263 with
safe_while(sleep
=15, tries
=40, action
="ceph-objectstore-tool --op list-pgs") as proceed
:
265 proc
= exp_remote
.run(args
=cmd
, wait
=True,
267 stdout
=StringIO(), stderr
=StringIO())
268 if proc
.exitstatus
== 0:
270 elif proc
.exitstatus
== 1 and proc
.stderr
== "OSD has the store locked":
273 raise Exception("ceph-objectstore-tool: "
274 "exp list-pgs failure with status {ret}".
275 format(ret
=proc
.exitstatus
))
277 pgs
= proc
.stdout
.getvalue().split('\n')[:-1]
279 self
.log("No PGs found for osd.{osd}".format(osd
=exp_osd
))
281 pg
= random
.choice(pgs
)
282 exp_path
= teuthology
.get_testdir(self
.ceph_manager
.ctx
)
283 exp_path
= os
.path
.join(exp_path
, '{0}.data'.format(self
.cluster
))
284 exp_path
= os
.path
.join(exp_path
,
285 "exp.{pg}.{id}".format(
289 # Can't use new export-remove op since this is part of upgrade testing
290 cmd
= prefix
+ "--op export --pgid {pg} --file {file}"
291 cmd
= cmd
.format(id=exp_osd
, pg
=pg
, file=exp_path
)
292 proc
= exp_remote
.run(args
=cmd
)
294 raise Exception("ceph-objectstore-tool: "
295 "export failure with status {ret}".
296 format(ret
=proc
.exitstatus
))
298 cmd
= prefix
+ "--force --op remove --pgid {pg}"
299 cmd
= cmd
.format(id=exp_osd
, pg
=pg
)
300 proc
= exp_remote
.run(args
=cmd
)
302 raise Exception("ceph-objectstore-tool: "
303 "remove failure with status {ret}".
304 format(ret
=proc
.exitstatus
))
305 # If there are at least 2 dead osds we might move the pg
306 if exp_osd
!= imp_osd
:
307 # If pg isn't already on this osd, then we will move it there
308 cmd
= (prefix
+ "--op list-pgs").format(id=imp_osd
)
309 proc
= imp_remote
.run(args
=cmd
, wait
=True,
310 check_status
=False, stdout
=StringIO())
312 raise Exception("ceph-objectstore-tool: "
313 "imp list-pgs failure with status {ret}".
314 format(ret
=proc
.exitstatus
))
315 pgs
= proc
.stdout
.getvalue().split('\n')[:-1]
317 self
.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
318 format(pg
=pg
, fosd
=exp_osd
, tosd
=imp_osd
))
319 if imp_remote
!= exp_remote
:
320 # Copy export file to the other machine
321 self
.log("Transfer export file from {srem} to {trem}".
322 format(srem
=exp_remote
, trem
=imp_remote
))
323 tmpexport
= Remote
.get_file(exp_remote
, exp_path
)
324 Remote
.put_file(imp_remote
, tmpexport
, exp_path
)
327 # Can't move the pg after all
329 imp_remote
= exp_remote
331 cmd
= (prefix
+ "--op import --file {file}")
332 cmd
= cmd
.format(id=imp_osd
, file=exp_path
)
333 proc
= imp_remote
.run(args
=cmd
, wait
=True, check_status
=False,
335 if proc
.exitstatus
== 1:
336 bogosity
= "The OSD you are using is older than the exported PG"
337 if bogosity
in proc
.stderr
.getvalue():
338 self
.log("OSD older than exported PG"
340 elif proc
.exitstatus
== 10:
341 self
.log("Pool went away before processing an import"
343 elif proc
.exitstatus
== 11:
344 self
.log("Attempt to import an incompatible export"
346 elif proc
.exitstatus
:
347 raise Exception("ceph-objectstore-tool: "
348 "import failure with status {ret}".
349 format(ret
=proc
.exitstatus
))
350 cmd
= "rm -f {file}".format(file=exp_path
)
351 exp_remote
.run(args
=cmd
)
352 if imp_remote
!= exp_remote
:
353 imp_remote
.run(args
=cmd
)
355 # apply low split settings to each pool
356 for pool
in self
.ceph_manager
.list_pools():
357 no_sudo_prefix
= prefix
[5:]
358 cmd
= ("CEPH_ARGS='--filestore-merge-threshold 1 "
359 "--filestore-split-multiple 1' sudo -E "
360 + no_sudo_prefix
+ "--op apply-layout-settings --pool " + pool
).format(id=osd
)
361 proc
= remote
.run(args
=cmd
, wait
=True, check_status
=False, stderr
=StringIO())
362 output
= proc
.stderr
.getvalue()
363 if 'Couldn\'t find pool' in output
:
366 raise Exception("ceph-objectstore-tool apply-layout-settings"
367 " failed with {status}".format(status
=proc
.exitstatus
))
369 def rm_past_intervals(self
, osd
=None):
371 :param osd: Osd to find pg to remove past intervals
373 if self
.test_rm_past_intervals
:
375 osd
= random
.choice(self
.dead_osds
)
376 self
.log("Use ceph_objectstore_tool to remove past intervals")
377 remote
= self
.ceph_manager
.find_remote('osd', osd
)
378 FSPATH
= self
.ceph_manager
.get_filepath()
379 JPATH
= os
.path
.join(FSPATH
, "journal")
380 if ('keyvaluestore_backend' in
381 self
.ceph_manager
.ctx
.ceph
[self
.cluster
].conf
['osd']):
382 prefix
= ("sudo adjust-ulimits ceph-objectstore-tool "
383 "--data-path {fpath} --journal-path {jpath} "
384 "--type keyvaluestore "
386 "/var/log/ceph/objectstore_tool.\\$pid.log ".
387 format(fpath
=FSPATH
, jpath
=JPATH
))
389 prefix
= ("sudo adjust-ulimits ceph-objectstore-tool "
390 "--data-path {fpath} --journal-path {jpath} "
392 "/var/log/ceph/objectstore_tool.\\$pid.log ".
393 format(fpath
=FSPATH
, jpath
=JPATH
))
394 cmd
= (prefix
+ "--op list-pgs").format(id=osd
)
395 proc
= remote
.run(args
=cmd
, wait
=True,
396 check_status
=False, stdout
=StringIO())
398 raise Exception("ceph_objectstore_tool: "
399 "exp list-pgs failure with status {ret}".
400 format(ret
=proc
.exitstatus
))
401 pgs
= proc
.stdout
.getvalue().split('\n')[:-1]
403 self
.log("No PGs found for osd.{osd}".format(osd
=osd
))
405 pg
= random
.choice(pgs
)
406 cmd
= (prefix
+ "--op rm-past-intervals --pgid {pg}").\
407 format(id=osd
, pg
=pg
)
408 proc
= remote
.run(args
=cmd
)
410 raise Exception("ceph_objectstore_tool: "
411 "rm-past-intervals failure with status {ret}".
412 format(ret
=proc
.exitstatus
))
414 def blackhole_kill_osd(self
, osd
=None):
416 If all else fails, kill the osd.
417 :param osd: Osd to be killed.
420 osd
= random
.choice(self
.live_osds
)
421 self
.log("Blackholing and then killing osd %s, live_osds are %s" %
422 (str(osd
), str(self
.live_osds
)))
423 self
.live_osds
.remove(osd
)
424 self
.dead_osds
.append(osd
)
425 self
.ceph_manager
.blackhole_kill_osd(osd
)
427 def revive_osd(self
, osd
=None, skip_admin_check
=False):
430 :param osd: Osd to be revived.
433 osd
= random
.choice(self
.dead_osds
)
434 self
.log("Reviving osd %s" % (str(osd
),))
435 self
.ceph_manager
.revive_osd(
438 skip_admin_check
=skip_admin_check
)
439 self
.dead_osds
.remove(osd
)
440 self
.live_osds
.append(osd
)
441 if self
.random_eio
> 0 and osd
is self
.rerrosd
:
442 self
.ceph_manager
.raw_cluster_cmd('tell', 'osd.'+str(self
.rerrosd
),
443 'injectargs', '--', '--filestore_debug_random_read_err='+str(self
.random_eio
))
444 self
.ceph_manager
.raw_cluster_cmd('tell', 'osd.'+str(self
.rerrosd
),
445 'injectargs', '--', '--bluestore_debug_random_read_err='+str(self
.random_eio
))
448 def out_osd(self
, osd
=None):
451 :param osd: Osd to be marked.
454 osd
= random
.choice(self
.in_osds
)
455 self
.log("Removing osd %s, in_osds are: %s" %
456 (str(osd
), str(self
.in_osds
)))
457 self
.ceph_manager
.mark_out_osd(osd
)
458 self
.in_osds
.remove(osd
)
459 self
.out_osds
.append(osd
)
461 def in_osd(self
, osd
=None):
464 :param osd: Osd to be marked.
467 osd
= random
.choice(self
.out_osds
)
468 if osd
in self
.dead_osds
:
469 return self
.revive_osd(osd
)
470 self
.log("Adding osd %s" % (str(osd
),))
471 self
.out_osds
.remove(osd
)
472 self
.in_osds
.append(osd
)
473 self
.ceph_manager
.mark_in_osd(osd
)
474 self
.log("Added osd %s" % (str(osd
),))
476 def reweight_osd_or_by_util(self
, osd
=None):
478 Reweight an osd that is in
479 :param osd: Osd to be marked.
481 if osd
is not None or random
.choice([True, False]):
483 osd
= random
.choice(self
.in_osds
)
484 val
= random
.uniform(.1, 1.0)
485 self
.log("Reweighting osd %s to %s" % (str(osd
), str(val
)))
486 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
489 # do it several times, the option space is large
492 'max_change': random
.choice(['0.05', '1.0', '3.0']),
493 'overage': random
.choice(['110', '1000']),
494 'type': random
.choice([
495 'reweight-by-utilization',
496 'test-reweight-by-utilization']),
498 self
.log("Reweighting by: %s"%(str(options
),))
499 self
.ceph_manager
.raw_cluster_cmd(
503 options
['max_change'])
505 def primary_affinity(self
, osd
=None):
507 osd
= random
.choice(self
.in_osds
)
508 if random
.random() >= .5:
510 elif random
.random() >= .5:
514 self
.log('Setting osd %s primary_affinity to %f' % (str(osd
), pa
))
515 self
.ceph_manager
.raw_cluster_cmd('osd', 'primary-affinity',
518 def thrash_cluster_full(self
):
520 Set and unset cluster full condition
522 self
.log('Setting full ratio to .001')
523 self
.ceph_manager
.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
525 self
.log('Setting full ratio back to .95')
526 self
.ceph_manager
.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
528 def thrash_pg_upmap(self
):
530 Install or remove random pg_upmap entries in OSDMap
532 from random
import shuffle
533 out
= self
.ceph_manager
.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
535 self
.log('j is %s' % j
)
537 if random
.random() >= .3:
538 pgs
= self
.ceph_manager
.get_pg_stats()
539 pg
= random
.choice(pgs
)
540 pgid
= str(pg
['pgid'])
541 poolid
= int(pgid
.split('.')[0])
542 sizes
= [x
['size'] for x
in j
['pools'] if x
['pool'] == poolid
]
546 osds
= self
.in_osds
+ self
.out_osds
549 self
.log('Setting %s to %s' % (pgid
, osds
))
550 cmd
= ['osd', 'pg-upmap', pgid
] + [str(x
) for x
in osds
]
551 self
.log('cmd %s' % cmd
)
552 self
.ceph_manager
.raw_cluster_cmd(*cmd
)
558 self
.log('Clearing pg_upmap on %s' % pg
)
559 self
.ceph_manager
.raw_cluster_cmd(
564 self
.log('No pg_upmap entries; doing nothing')
565 except CommandFailedError
:
566 self
.log('Failed to rm-pg-upmap, ignoring')
568 def thrash_pg_upmap_items(self
):
570 Install or remove random pg_upmap_items entries in OSDMap
572 from random
import shuffle
573 out
= self
.ceph_manager
.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
575 self
.log('j is %s' % j
)
577 if random
.random() >= .3:
578 pgs
= self
.ceph_manager
.get_pg_stats()
579 pg
= random
.choice(pgs
)
580 pgid
= str(pg
['pgid'])
581 poolid
= int(pgid
.split('.')[0])
582 sizes
= [x
['size'] for x
in j
['pools'] if x
['pool'] == poolid
]
586 osds
= self
.in_osds
+ self
.out_osds
589 self
.log('Setting %s to %s' % (pgid
, osds
))
590 cmd
= ['osd', 'pg-upmap-items', pgid
] + [str(x
) for x
in osds
]
591 self
.log('cmd %s' % cmd
)
592 self
.ceph_manager
.raw_cluster_cmd(*cmd
)
594 m
= j
['pg_upmap_items']
598 self
.log('Clearing pg_upmap on %s' % pg
)
599 self
.ceph_manager
.raw_cluster_cmd(
604 self
.log('No pg_upmap entries; doing nothing')
605 except CommandFailedError
:
606 self
.log('Failed to rm-pg-upmap-items, ignoring')
608 def force_recovery(self
):
610 Force recovery on some of PGs
612 backfill
= random
.random() >= 0.5
613 j
= self
.ceph_manager
.get_pgids_to_force(backfill
)
617 self
.ceph_manager
.raw_cluster_cmd('pg', 'force-backfill', *j
)
619 self
.ceph_manager
.raw_cluster_cmd('pg', 'force-recovery', *j
)
620 except CommandFailedError
:
621 self
.log('Failed to force backfill|recovery, ignoring')
624 def cancel_force_recovery(self
):
626 Force recovery on some of PGs
628 backfill
= random
.random() >= 0.5
629 j
= self
.ceph_manager
.get_pgids_to_cancel_force(backfill
)
633 self
.ceph_manager
.raw_cluster_cmd('pg', 'cancel-force-backfill', *j
)
635 self
.ceph_manager
.raw_cluster_cmd('pg', 'cancel-force-recovery', *j
)
636 except CommandFailedError
:
637 self
.log('Failed to force backfill|recovery, ignoring')
639 def force_cancel_recovery(self
):
641 Force or cancel forcing recovery
643 if random
.random() >= 0.4:
644 self
.force_recovery()
646 self
.cancel_force_recovery()
650 Make sure all osds are up and not out.
652 while len(self
.dead_osds
) > 0:
653 self
.log("reviving osd")
655 while len(self
.out_osds
) > 0:
656 self
.log("inning osd")
661 Make sure all osds are up and fully in.
664 for osd
in self
.live_osds
:
665 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
667 self
.ceph_manager
.raw_cluster_cmd('osd', 'primary-affinity',
672 Break out of this Ceph loop
676 if self
.sighup_delay
:
677 self
.log("joining the do_sighup greenlet")
678 self
.sighup_thread
.get()
679 if self
.optrack_toggle_delay
:
680 self
.log("joining the do_optrack_toggle greenlet")
681 self
.optrack_toggle_thread
.join()
682 if self
.dump_ops_enable
== "true":
683 self
.log("joining the do_dump_ops greenlet")
684 self
.dump_ops_thread
.join()
685 if self
.noscrub_toggle_delay
:
686 self
.log("joining the do_noscrub_toggle greenlet")
687 self
.noscrub_toggle_thread
.join()
691 Increase the size of the pool
693 pool
= self
.ceph_manager
.get_pool()
694 orig_pg_num
= self
.ceph_manager
.get_pool_pg_num(pool
)
695 self
.log("Growing pool %s" % (pool
,))
696 if self
.ceph_manager
.expand_pool(pool
,
697 self
.config
.get('pool_grow_by', 10),
699 self
.pools_to_fix_pgp_num
.add(pool
)
701 def fix_pgp_num(self
, pool
=None):
703 Fix number of pgs in pool.
706 pool
= self
.ceph_manager
.get_pool()
710 self
.log("fixing pg num pool %s" % (pool
,))
711 if self
.ceph_manager
.set_pool_pgpnum(pool
, force
):
712 self
.pools_to_fix_pgp_num
.discard(pool
)
714 def test_pool_min_size(self
):
716 Kill and revive all osds except one.
718 self
.log("test_pool_min_size")
720 self
.ceph_manager
.wait_for_recovery(
721 timeout
=self
.config
.get('timeout')
723 the_one
= random
.choice(self
.in_osds
)
724 self
.log("Killing everyone but %s", the_one
)
725 to_kill
= filter(lambda x
: x
!= the_one
, self
.in_osds
)
726 [self
.kill_osd(i
) for i
in to_kill
]
727 [self
.out_osd(i
) for i
in to_kill
]
728 time
.sleep(self
.config
.get("test_pool_min_size_time", 10))
729 self
.log("Killing %s" % (the_one
,))
730 self
.kill_osd(the_one
)
731 self
.out_osd(the_one
)
732 self
.log("Reviving everyone but %s" % (the_one
,))
733 [self
.revive_osd(i
) for i
in to_kill
]
734 [self
.in_osd(i
) for i
in to_kill
]
735 self
.log("Revived everyone but %s" % (the_one
,))
736 self
.log("Waiting for clean")
737 self
.ceph_manager
.wait_for_recovery(
738 timeout
=self
.config
.get('timeout')
741 def inject_pause(self
, conf_key
, duration
, check_after
, should_be_down
):
743 Pause injection testing. Check for osd being down when finished.
745 the_one
= random
.choice(self
.live_osds
)
746 self
.log("inject_pause on {osd}".format(osd
=the_one
))
748 "Testing {key} pause injection for duration {duration}".format(
753 "Checking after {after}, should_be_down={shouldbedown}".format(
755 shouldbedown
=should_be_down
757 self
.ceph_manager
.set_config(the_one
, **{conf_key
: duration
})
758 if not should_be_down
:
760 time
.sleep(check_after
)
761 status
= self
.ceph_manager
.get_osd_status()
762 assert the_one
in status
['down']
763 time
.sleep(duration
- check_after
+ 20)
764 status
= self
.ceph_manager
.get_osd_status()
765 assert not the_one
in status
['down']
767 def test_backfill_full(self
):
769 Test backfills stopping when the replica fills up.
771 First, use injectfull admin command to simulate a now full
772 osd by setting it to 0 on all of the OSDs.
774 Second, on a random subset, set
775 osd_debug_skip_full_check_in_backfill_reservation to force
776 the more complicated check in do_scan to be exercised.
778 Then, verify that all backfillings stop.
780 self
.log("injecting backfill full")
781 for i
in self
.live_osds
:
782 self
.ceph_manager
.set_config(
784 osd_debug_skip_full_check_in_backfill_reservation
=
785 random
.choice(['false', 'true']))
786 self
.ceph_manager
.osd_admin_socket(i
, command
=['injectfull', 'backfillfull'],
787 check_status
=True, timeout
=30, stdout
=DEVNULL
)
789 status
= self
.ceph_manager
.compile_pg_status()
790 if 'backfilling' not in status
.keys():
793 "waiting for {still_going} backfillings".format(
794 still_going
=status
.get('backfilling')))
796 assert('backfilling' not in self
.ceph_manager
.compile_pg_status().keys())
797 for i
in self
.live_osds
:
798 self
.ceph_manager
.set_config(
800 osd_debug_skip_full_check_in_backfill_reservation
='false')
801 self
.ceph_manager
.osd_admin_socket(i
, command
=['injectfull', 'none'],
802 check_status
=True, timeout
=30, stdout
=DEVNULL
)
804 def test_map_discontinuity(self
):
806 1) Allows the osds to recover
808 3) allows the remaining osds to recover
809 4) waits for some time
811 This sequence should cause the revived osd to have to handle
812 a map gap since the mons would have trimmed
814 while len(self
.in_osds
) < (self
.minin
+ 1):
816 self
.log("Waiting for recovery")
817 self
.ceph_manager
.wait_for_all_osds_up(
818 timeout
=self
.config
.get('timeout')
820 # now we wait 20s for the pg status to change, if it takes longer,
821 # the test *should* fail!
823 self
.ceph_manager
.wait_for_clean(
824 timeout
=self
.config
.get('timeout')
827 # now we wait 20s for the backfill replicas to hear about the clean
829 self
.log("Recovered, killing an osd")
830 self
.kill_osd(mark_down
=True, mark_out
=True)
831 self
.log("Waiting for clean again")
832 self
.ceph_manager
.wait_for_clean(
833 timeout
=self
.config
.get('timeout')
835 self
.log("Waiting for trim")
836 time
.sleep(int(self
.config
.get("map_discontinuity_sleep_time", 40)))
839 def choose_action(self
):
841 Random action selector.
843 chance_down
= self
.config
.get('chance_down', 0.4)
844 chance_test_min_size
= self
.config
.get('chance_test_min_size', 0)
845 chance_test_backfill_full
= \
846 self
.config
.get('chance_test_backfill_full', 0)
847 if isinstance(chance_down
, int):
848 chance_down
= float(chance_down
) / 100
850 minout
= self
.config
.get("min_out", 0)
851 minlive
= self
.config
.get("min_live", 2)
852 mindead
= self
.config
.get("min_dead", 0)
854 self
.log('choose_action: min_in %d min_out '
855 '%d min_live %d min_dead %d' %
856 (minin
, minout
, minlive
, mindead
))
858 if len(self
.in_osds
) > minin
:
859 actions
.append((self
.out_osd
, 1.0,))
860 if len(self
.live_osds
) > minlive
and chance_down
> 0:
861 actions
.append((self
.kill_osd
, chance_down
,))
862 if len(self
.dead_osds
) > 1:
863 actions
.append((self
.rm_past_intervals
, 1.0,))
864 if len(self
.out_osds
) > minout
:
865 actions
.append((self
.in_osd
, 1.7,))
866 if len(self
.dead_osds
) > mindead
:
867 actions
.append((self
.revive_osd
, 1.0,))
868 if self
.config
.get('thrash_primary_affinity', True):
869 actions
.append((self
.primary_affinity
, 1.0,))
870 actions
.append((self
.reweight_osd_or_by_util
,
871 self
.config
.get('reweight_osd', .5),))
872 actions
.append((self
.grow_pool
,
873 self
.config
.get('chance_pgnum_grow', 0),))
874 actions
.append((self
.fix_pgp_num
,
875 self
.config
.get('chance_pgpnum_fix', 0),))
876 actions
.append((self
.test_pool_min_size
,
877 chance_test_min_size
,))
878 actions
.append((self
.test_backfill_full
,
879 chance_test_backfill_full
,))
880 if self
.chance_thrash_cluster_full
> 0:
881 actions
.append((self
.thrash_cluster_full
, self
.chance_thrash_cluster_full
,))
882 if self
.chance_thrash_pg_upmap
> 0:
883 actions
.append((self
.thrash_pg_upmap
, self
.chance_thrash_pg_upmap
,))
884 if self
.chance_thrash_pg_upmap_items
> 0:
885 actions
.append((self
.thrash_pg_upmap_items
, self
.chance_thrash_pg_upmap_items
,))
886 if self
.chance_force_recovery
> 0:
887 actions
.append((self
.force_cancel_recovery
, self
.chance_force_recovery
))
889 for key
in ['heartbeat_inject_failure', 'filestore_inject_stall']:
892 self
.inject_pause(key
,
893 self
.config
.get('pause_short', 3),
896 self
.config
.get('chance_inject_pause_short', 1),),
898 self
.inject_pause(key
,
899 self
.config
.get('pause_long', 80),
900 self
.config
.get('pause_check_after', 70),
902 self
.config
.get('chance_inject_pause_long', 0),)]:
903 actions
.append(scenario
)
905 total
= sum([y
for (x
, y
) in actions
])
906 val
= random
.uniform(0, total
)
907 for (action
, prob
) in actions
:
919 self
.log(traceback
.format_exc())
926 Loops and sends signal.SIGHUP to a random live osd.
928 Loop delay is controlled by the config value sighup_delay.
930 delay
= float(self
.sighup_delay
)
931 self
.log("starting do_sighup with a delay of {0}".format(delay
))
932 while not self
.stopping
:
933 osd
= random
.choice(self
.live_osds
)
934 self
.ceph_manager
.signal_osd(osd
, signal
.SIGHUP
, silent
=True)
938 def do_optrack_toggle(self
):
940 Loops and toggle op tracking to all osds.
942 Loop delay is controlled by the config value optrack_toggle_delay.
944 delay
= float(self
.optrack_toggle_delay
)
946 self
.log("starting do_optrack_toggle with a delay of {0}".format(delay
))
947 while not self
.stopping
:
948 if osd_state
== "true":
952 self
.ceph_manager
.raw_cluster_cmd_result('tell', 'osd.*',
953 'injectargs', '--osd_enable_op_tracker=%s' % osd_state
)
957 def do_dump_ops(self
):
959 Loops and does op dumps on all osds
961 self
.log("starting do_dump_ops")
962 while not self
.stopping
:
963 for osd
in self
.live_osds
:
964 # Ignore errors because live_osds is in flux
965 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_ops_in_flight'],
966 check_status
=False, timeout
=30, stdout
=DEVNULL
)
967 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_blocked_ops'],
968 check_status
=False, timeout
=30, stdout
=DEVNULL
)
969 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_historic_ops'],
970 check_status
=False, timeout
=30, stdout
=DEVNULL
)
974 def do_noscrub_toggle(self
):
976 Loops and toggle noscrub flags
978 Loop delay is controlled by the config value noscrub_toggle_delay.
980 delay
= float(self
.noscrub_toggle_delay
)
982 self
.log("starting do_noscrub_toggle with a delay of {0}".format(delay
))
983 while not self
.stopping
:
984 if scrub_state
== "none":
985 self
.ceph_manager
.raw_cluster_cmd('osd', 'set', 'noscrub')
986 scrub_state
= "noscrub"
987 elif scrub_state
== "noscrub":
988 self
.ceph_manager
.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
990 elif scrub_state
== "both":
991 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'noscrub')
992 scrub_state
= "nodeep-scrub"
994 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
997 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'noscrub')
998 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1001 def do_thrash(self
):
1003 Loop to select random actions to thrash ceph manager with.
1005 cleanint
= self
.config
.get("clean_interval", 60)
1006 scrubint
= self
.config
.get("scrub_interval", -1)
1007 maxdead
= self
.config
.get("max_dead", 0)
1008 delay
= self
.config
.get("op_delay", 5)
1009 self
.rerrosd
= self
.live_osds
[0]
1010 if self
.random_eio
> 0:
1011 self
.ceph_manager
.raw_cluster_cmd('tell', 'osd.'+str(self
.rerrosd
),
1012 'injectargs', '--', '--filestore_debug_random_read_err='+str(self
.random_eio
))
1013 self
.ceph_manager
.raw_cluster_cmd('tell', 'osd.'+str(self
.rerrosd
),
1014 'injectargs', '--', '--bluestore_debug_random_read_err='+str(self
.random_eio
))
1015 self
.log("starting do_thrash")
1016 while not self
.stopping
:
1017 to_log
= [str(x
) for x
in ["in_osds: ", self
.in_osds
,
1018 "out_osds: ", self
.out_osds
,
1019 "dead_osds: ", self
.dead_osds
,
1020 "live_osds: ", self
.live_osds
]]
1021 self
.log(" ".join(to_log
))
1022 if random
.uniform(0, 1) < (float(delay
) / cleanint
):
1023 while len(self
.dead_osds
) > maxdead
:
1025 for osd
in self
.in_osds
:
1026 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
1028 if random
.uniform(0, 1) < float(
1029 self
.config
.get('chance_test_map_discontinuity', 0)):
1030 self
.test_map_discontinuity()
1032 self
.ceph_manager
.wait_for_recovery(
1033 timeout
=self
.config
.get('timeout')
1035 time
.sleep(self
.clean_wait
)
1037 if random
.uniform(0, 1) < (float(delay
) / scrubint
):
1038 self
.log('Scrubbing while thrashing being performed')
1039 Scrubber(self
.ceph_manager
, self
.config
)
1040 self
.choose_action()()
1043 if self
.random_eio
> 0:
1044 self
.ceph_manager
.raw_cluster_cmd('tell', 'osd.'+str(self
.rerrosd
),
1045 'injectargs', '--', '--filestore_debug_random_read_err=0.0')
1046 self
.ceph_manager
.raw_cluster_cmd('tell', 'osd.'+str(self
.rerrosd
),
1047 'injectargs', '--', '--bluestore_debug_random_read_err=0.0')
1048 for pool
in list(self
.pools_to_fix_pgp_num
):
1049 if self
.ceph_manager
.get_pool_pg_num(pool
) > 0:
1050 self
.fix_pgp_num(pool
)
1051 self
.pools_to_fix_pgp_num
.clear()
1052 for service
, opt
, saved_value
in self
.saved_options
:
1053 self
._set
_config
(service
, '*', opt
, saved_value
)
1054 self
.saved_options
= []
1058 class ObjectStoreTool
:
1060 def __init__(self
, manager
, pool
, **kwargs
):
1061 self
.manager
= manager
1063 self
.osd
= kwargs
.get('osd', None)
1064 self
.object_name
= kwargs
.get('object_name', None)
1065 self
.do_revive
= kwargs
.get('do_revive', True)
1066 if self
.osd
and self
.pool
and self
.object_name
:
1067 if self
.osd
== "primary":
1068 self
.osd
= self
.manager
.get_object_primary(self
.pool
,
1071 if self
.object_name
:
1072 self
.pgid
= self
.manager
.get_object_pg_with_shard(self
.pool
,
1075 self
.remote
= self
.manager
.ctx
.\
1076 cluster
.only('osd.{o}'.format(o
=self
.osd
)).remotes
.keys()[0]
1077 path
= self
.manager
.get_filepath().format(id=self
.osd
)
1078 self
.paths
= ("--data-path {path} --journal-path {path}/journal".
1081 def build_cmd(self
, options
, args
, stdin
):
1083 if self
.object_name
:
1084 lines
.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1085 "{paths} --pgid {pgid} --op list |"
1086 "grep '\"oid\":\"{name}\"')".
1087 format(paths
=self
.paths
,
1089 name
=self
.object_name
))
1090 args
= '"$object" ' + args
1091 options
+= " --pgid {pgid}".format(pgid
=self
.pgid
)
1092 cmd
= ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1093 format(paths
=self
.paths
,
1097 cmd
= ("echo {payload} | base64 --decode | {cmd}".
1098 format(payload
=base64
.encode(stdin
),
1101 return "\n".join(lines
)
1103 def run(self
, options
, args
, stdin
=None, stdout
=None):
1106 self
.manager
.kill_osd(self
.osd
)
1107 cmd
= self
.build_cmd(options
, args
, stdin
)
1108 self
.manager
.log(cmd
)
1110 proc
= self
.remote
.run(args
=['bash', '-e', '-x', '-c', cmd
],
1115 if proc
.exitstatus
!= 0:
1116 self
.manager
.log("failed with " + str(proc
.exitstatus
))
1117 error
= proc
.stdout
.getvalue() + " " + proc
.stderr
.getvalue()
1118 raise Exception(error
)
1121 self
.manager
.revive_osd(self
.osd
)
1122 self
.manager
.wait_till_osd_is_up(self
.osd
, 300)
1127 Ceph manager object.
1128 Contains several local functions that form a bulk of this module.
1130 Note: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1135 ERASURE_CODED_POOL
= 3
1137 def __init__(self
, controller
, ctx
=None, config
=None, logger
=None,
1139 self
.lock
= threading
.RLock()
1141 self
.config
= config
1142 self
.controller
= controller
1143 self
.next_pool_id
= 0
1144 self
.cluster
= cluster
1146 self
.log
= lambda x
: logger
.info(x
)
1150 implement log behavior.
1154 if self
.config
is None:
1155 self
.config
= dict()
1156 pools
= self
.list_pools()
1159 # we may race with a pool deletion; ignore failures here
1161 self
.pools
[pool
] = self
.get_pool_property(pool
, 'pg_num')
1162 except CommandFailedError
:
1163 self
.log('Failed to get pg_num from pool %s, ignoring' % pool
)
1165 def raw_cluster_cmd(self
, *args
):
1167 Start ceph on a raw cluster. Return count
1169 testdir
= teuthology
.get_testdir(self
.ctx
)
1174 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1181 ceph_args
.extend(args
)
1182 proc
= self
.controller
.run(
1186 return proc
.stdout
.getvalue()
1188 def raw_cluster_cmd_result(self
, *args
):
1190 Start ceph on a cluster. Return success or failure information.
1192 testdir
= teuthology
.get_testdir(self
.ctx
)
1197 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1204 ceph_args
.extend(args
)
1205 proc
= self
.controller
.run(
1209 return proc
.exitstatus
1211 def run_ceph_w(self
):
1213 Execute "ceph -w" in the background with stdout connected to a StringIO,
1214 and return the RemoteProcess.
1216 return self
.controller
.run(
1224 wait
=False, stdout
=StringIO(), stdin
=run
.PIPE
)
1226 def flush_pg_stats(self
, osds
, no_wait
=None, wait_for_mon
=300):
1228 Flush pg stats from a list of OSD ids, ensuring they are reflected
1229 all the way to the monitor. Luminous and later only.
1231 :param osds: list of OSDs to flush
1232 :param no_wait: list of OSDs not to wait for seq id. by default, we
1233 wait for all specified osds, but some of them could be
1234 moved out of osdmap, so we cannot get their updated
1235 stat seq from monitor anymore. in that case, you need
1236 to pass a blacklist.
1237 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
1238 it. (5 min by default)
1240 seq
= {osd
: self
.raw_cluster_cmd('tell', 'osd.%d' % osd
, 'flush_pg_stats')
1242 if not wait_for_mon
:
1246 for osd
, need
in seq
.iteritems():
1250 while wait_for_mon
> 0:
1251 got
= self
.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd
)
1252 self
.log('need seq {need} got {got} for osd.{osd}'.format(
1253 need
=need
, got
=got
, osd
=osd
))
1258 wait_for_mon
-= A_WHILE
1260 raise Exception('timed out waiting for mon to be updated with '
1261 'osd.{osd}: {got} < {need}'.
1262 format(osd
=osd
, got
=got
, need
=need
))
1264 def flush_all_pg_stats(self
):
1265 self
.flush_pg_stats(range(len(self
.get_osd_dump())))
1267 def do_rados(self
, remote
, cmd
, check_status
=True):
1269 Execute a remote rados command.
1271 testdir
= teuthology
.get_testdir(self
.ctx
)
1275 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1284 check_status
=check_status
1288 def rados_write_objects(self
, pool
, num_objects
, size
,
1289 timelimit
, threads
, cleanup
=False):
1292 Threads not used yet.
1296 '--num-objects', num_objects
,
1302 args
.append('--no-cleanup')
1303 return self
.do_rados(self
.controller
, map(str, args
))
1305 def do_put(self
, pool
, obj
, fname
, namespace
=None):
1307 Implement rados put operation
1310 if namespace
is not None:
1311 args
+= ['-N', namespace
]
1317 return self
.do_rados(
1323 def do_get(self
, pool
, obj
, fname
='/dev/null', namespace
=None):
1325 Implement rados get operation
1328 if namespace
is not None:
1329 args
+= ['-N', namespace
]
1335 return self
.do_rados(
1341 def do_rm(self
, pool
, obj
, namespace
=None):
1343 Implement rados rm operation
1346 if namespace
is not None:
1347 args
+= ['-N', namespace
]
1352 return self
.do_rados(
1358 def osd_admin_socket(self
, osd_id
, command
, check_status
=True, timeout
=0, stdout
=None):
1361 return self
.admin_socket('osd', osd_id
, command
, check_status
, timeout
, stdout
)
1363 def find_remote(self
, service_type
, service_id
):
1365 Get the Remote for the host where a particular service runs.
1367 :param service_type: 'mds', 'osd', 'client'
1368 :param service_id: The second part of a role, e.g. '0' for
1370 :return: a Remote instance for the host where the
1371 requested role is placed
1373 return get_remote(self
.ctx
, self
.cluster
,
1374 service_type
, service_id
)
1376 def admin_socket(self
, service_type
, service_id
,
1377 command
, check_status
=True, timeout
=0, stdout
=None):
1379 Remotely start up ceph specifying the admin socket
1380 :param command: a list of words to use as the command
1385 testdir
= teuthology
.get_testdir(self
.ctx
)
1386 remote
= self
.find_remote(service_type
, service_id
)
1391 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1398 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1399 cluster
=self
.cluster
,
1403 args
.extend(command
)
1408 check_status
=check_status
1411 def objectstore_tool(self
, pool
, options
, args
, **kwargs
):
1412 return ObjectStoreTool(self
, pool
, **kwargs
).run(options
, args
)
1414 def get_pgid(self
, pool
, pgnum
):
1416 :param pool: pool name
1417 :param pgnum: pg number
1418 :returns: a string representing this pg.
1420 poolnum
= self
.get_pool_num(pool
)
1421 pg_str
= "{poolnum}.{pgnum}".format(
1426 def get_pg_replica(self
, pool
, pgnum
):
1428 get replica for pool, pgnum (e.g. (data, 0)->0
1430 pg_str
= self
.get_pgid(pool
, pgnum
)
1431 output
= self
.raw_cluster_cmd("pg", "map", pg_str
, '--format=json')
1432 j
= json
.loads('\n'.join(output
.split('\n')[1:]))
1433 return int(j
['acting'][-1])
1436 def wait_for_pg_stats(func
):
1437 # both osd_mon_report_interval_min and mgr_stats_period are 5 seconds
1438 # by default, and take the faulty injection in ms into consideration,
1439 # 12 seconds are more than enough
1440 delays
= [1, 1, 2, 3, 5, 8, 13, 0]
1442 def wrapper(self
, *args
, **kwargs
):
1444 for delay
in delays
:
1446 return func(self
, *args
, **kwargs
)
1447 except AssertionError as e
:
1453 def get_pg_primary(self
, pool
, pgnum
):
1455 get primary for pool, pgnum (e.g. (data, 0)->0
1457 pg_str
= self
.get_pgid(pool
, pgnum
)
1458 output
= self
.raw_cluster_cmd("pg", "map", pg_str
, '--format=json')
1459 j
= json
.loads('\n'.join(output
.split('\n')[1:]))
1460 return int(j
['acting'][0])
1463 def get_pool_num(self
, pool
):
1465 get number for pool (e.g., data -> 2)
1467 return int(self
.get_pool_dump(pool
)['pool'])
1469 def list_pools(self
):
1473 osd_dump
= self
.get_osd_dump_json()
1474 self
.log(osd_dump
['pools'])
1475 return [str(i
['pool_name']) for i
in osd_dump
['pools']]
1477 def clear_pools(self
):
1481 [self
.remove_pool(i
) for i
in self
.list_pools()]
1483 def kick_recovery_wq(self
, osdnum
):
1485 Run kick_recovery_wq on cluster.
1487 return self
.raw_cluster_cmd(
1488 'tell', "osd.%d" % (int(osdnum
),),
1493 def wait_run_admin_socket(self
, service_type
,
1494 service_id
, args
=['version'], timeout
=75, stdout
=None):
1496 If osd_admin_socket call suceeds, return. Otherwise wait
1497 five seconds and try again.
1503 proc
= self
.admin_socket(service_type
, service_id
,
1504 args
, check_status
=False, stdout
=stdout
)
1505 if proc
.exitstatus
is 0:
1509 if (tries
* 5) > timeout
:
1510 raise Exception('timed out waiting for admin_socket '
1511 'to appear after {type}.{id} restart'.
1512 format(type=service_type
,
1514 self
.log("waiting on admin_socket for {type}-{id}, "
1515 "{command}".format(type=service_type
,
1520 def get_pool_dump(self
, pool
):
1522 get the osd dump part of a pool
1524 osd_dump
= self
.get_osd_dump_json()
1525 for i
in osd_dump
['pools']:
1526 if i
['pool_name'] == pool
:
1530 def get_config(self
, service_type
, service_id
, name
):
1532 :param node: like 'mon.a'
1533 :param name: the option name
1535 proc
= self
.wait_run_admin_socket(service_type
, service_id
,
1537 j
= json
.loads(proc
.stdout
.getvalue())
1540 def set_config(self
, osdnum
, **argdict
):
1542 :param osdnum: osd number
1543 :param argdict: dictionary containing values to set.
1545 for k
, v
in argdict
.iteritems():
1546 self
.wait_run_admin_socket(
1548 ['config', 'set', str(k
), str(v
)])
1550 def raw_cluster_status(self
):
1552 Get status from cluster
1554 status
= self
.raw_cluster_cmd('status', '--format=json-pretty')
1555 return json
.loads(status
)
1557 def raw_osd_status(self
):
1559 Get osd status from cluster
1561 return self
.raw_cluster_cmd('osd', 'dump')
1563 def get_osd_status(self
):
1565 Get osd statuses sorted by states that the osds are in.
1568 lambda x
: x
.startswith('osd.') and (("up" in x
) or ("down" in x
)),
1569 self
.raw_osd_status().split('\n'))
1571 in_osds
= [int(i
[4:].split()[0])
1572 for i
in filter(lambda x
: " in " in x
, osd_lines
)]
1573 out_osds
= [int(i
[4:].split()[0])
1574 for i
in filter(lambda x
: " out " in x
, osd_lines
)]
1575 up_osds
= [int(i
[4:].split()[0])
1576 for i
in filter(lambda x
: " up " in x
, osd_lines
)]
1577 down_osds
= [int(i
[4:].split()[0])
1578 for i
in filter(lambda x
: " down " in x
, osd_lines
)]
1579 dead_osds
= [int(x
.id_
)
1580 for x
in filter(lambda x
:
1583 iter_daemons_of_role('osd', self
.cluster
))]
1584 live_osds
= [int(x
.id_
) for x
in
1587 self
.ctx
.daemons
.iter_daemons_of_role('osd',
1589 return {'in': in_osds
, 'out': out_osds
, 'up': up_osds
,
1590 'down': down_osds
, 'dead': dead_osds
, 'live': live_osds
,
1593 def get_num_pgs(self
):
1595 Check cluster status for the number of pgs
1597 status
= self
.raw_cluster_status()
1599 return status
['pgmap']['num_pgs']
1601 def create_erasure_code_profile(self
, profile_name
, profile
):
1603 Create an erasure code profile name that can be used as a parameter
1604 when creating an erasure coded pool.
1607 args
= cmd_erasure_code_profile(profile_name
, profile
)
1608 self
.raw_cluster_cmd(*args
)
1610 def create_pool_with_unique_name(self
, pg_num
=16,
1611 erasure_code_profile_name
=None,
1613 erasure_code_use_overwrites
=False):
1615 Create a pool named unique_pool_X where X is unique.
1619 name
= "unique_pool_%s" % (str(self
.next_pool_id
),)
1620 self
.next_pool_id
+= 1
1624 erasure_code_profile_name
=erasure_code_profile_name
,
1626 erasure_code_use_overwrites
=erasure_code_use_overwrites
)
1629 @contextlib.contextmanager
1630 def pool(self
, pool_name
, pg_num
=16, erasure_code_profile_name
=None):
1631 self
.create_pool(pool_name
, pg_num
, erasure_code_profile_name
)
1633 self
.remove_pool(pool_name
)
1635 def create_pool(self
, pool_name
, pg_num
=16,
1636 erasure_code_profile_name
=None,
1638 erasure_code_use_overwrites
=False):
1640 Create a pool named from the pool_name parameter.
1641 :param pool_name: name of the pool being created.
1642 :param pg_num: initial number of pgs.
1643 :param erasure_code_profile_name: if set and !None create an
1644 erasure coded pool using the profile
1645 :param erasure_code_use_overwrites: if true, allow overwrites
1648 assert isinstance(pool_name
, basestring
)
1649 assert isinstance(pg_num
, int)
1650 assert pool_name
not in self
.pools
1651 self
.log("creating pool_name %s" % (pool_name
,))
1652 if erasure_code_profile_name
:
1653 self
.raw_cluster_cmd('osd', 'pool', 'create',
1654 pool_name
, str(pg_num
), str(pg_num
),
1655 'erasure', erasure_code_profile_name
)
1657 self
.raw_cluster_cmd('osd', 'pool', 'create',
1658 pool_name
, str(pg_num
))
1659 if min_size
is not None:
1660 self
.raw_cluster_cmd(
1661 'osd', 'pool', 'set', pool_name
,
1664 if erasure_code_use_overwrites
:
1665 self
.raw_cluster_cmd(
1666 'osd', 'pool', 'set', pool_name
,
1667 'allow_ec_overwrites',
1669 self
.raw_cluster_cmd(
1670 'osd', 'pool', 'application', 'enable',
1671 pool_name
, 'rados', '--yes-i-really-mean-it',
1672 run
.Raw('||'), 'true')
1673 self
.pools
[pool_name
] = pg_num
1676 def add_pool_snap(self
, pool_name
, snap_name
):
1679 :param pool_name: name of pool to snapshot
1680 :param snap_name: name of snapshot to take
1682 self
.raw_cluster_cmd('osd', 'pool', 'mksnap',
1683 str(pool_name
), str(snap_name
))
1685 def remove_pool_snap(self
, pool_name
, snap_name
):
1687 Remove pool snapshot
1688 :param pool_name: name of pool to snapshot
1689 :param snap_name: name of snapshot to remove
1691 self
.raw_cluster_cmd('osd', 'pool', 'rmsnap',
1692 str(pool_name
), str(snap_name
))
1694 def remove_pool(self
, pool_name
):
1696 Remove the indicated pool
1697 :param pool_name: Pool to be removed
1700 assert isinstance(pool_name
, basestring
)
1701 assert pool_name
in self
.pools
1702 self
.log("removing pool_name %s" % (pool_name
,))
1703 del self
.pools
[pool_name
]
1704 self
.do_rados(self
.controller
,
1705 ['rmpool', pool_name
, pool_name
,
1706 "--yes-i-really-really-mean-it"])
1713 return random
.choice(self
.pools
.keys())
1715 def get_pool_pg_num(self
, pool_name
):
1717 Return the number of pgs in the pool specified.
1720 assert isinstance(pool_name
, basestring
)
1721 if pool_name
in self
.pools
:
1722 return self
.pools
[pool_name
]
1725 def get_pool_property(self
, pool_name
, prop
):
1727 :param pool_name: pool
1728 :param prop: property to be checked.
1729 :returns: property as an int value.
1732 assert isinstance(pool_name
, basestring
)
1733 assert isinstance(prop
, basestring
)
1734 output
= self
.raw_cluster_cmd(
1740 return int(output
.split()[1])
1742 def set_pool_property(self
, pool_name
, prop
, val
):
1744 :param pool_name: pool
1745 :param prop: property to be set.
1746 :param val: value to set.
1748 This routine retries if set operation fails.
1751 assert isinstance(pool_name
, basestring
)
1752 assert isinstance(prop
, basestring
)
1753 assert isinstance(val
, int)
1756 r
= self
.raw_cluster_cmd_result(
1763 if r
!= 11: # EAGAIN
1767 raise Exception('timed out getting EAGAIN '
1768 'when setting pool property %s %s = %s' %
1769 (pool_name
, prop
, val
))
1770 self
.log('got EAGAIN setting pool property, '
1771 'waiting a few seconds...')
1774 def expand_pool(self
, pool_name
, by
, max_pgs
):
1776 Increase the number of pgs in a pool
1779 assert isinstance(pool_name
, basestring
)
1780 assert isinstance(by
, int)
1781 assert pool_name
in self
.pools
1782 if self
.get_num_creating() > 0:
1784 if (self
.pools
[pool_name
] + by
) > max_pgs
:
1786 self
.log("increase pool size by %d" % (by
,))
1787 new_pg_num
= self
.pools
[pool_name
] + by
1788 self
.set_pool_property(pool_name
, "pg_num", new_pg_num
)
1789 self
.pools
[pool_name
] = new_pg_num
1792 def set_pool_pgpnum(self
, pool_name
, force
):
1794 Set pgpnum property of pool_name pool.
1797 assert isinstance(pool_name
, basestring
)
1798 assert pool_name
in self
.pools
1799 if not force
and self
.get_num_creating() > 0:
1801 self
.set_pool_property(pool_name
, 'pgp_num', self
.pools
[pool_name
])
1804 def list_pg_missing(self
, pgid
):
1806 return list of missing pgs with the id specified
1811 out
= self
.raw_cluster_cmd('--', 'pg', pgid
, 'list_missing',
1817 r
['objects'].extend(j
['objects'])
1822 offset
= j
['objects'][-1]['oid']
1827 def get_pg_stats(self
):
1829 Dump the cluster and get pg stats
1831 out
= self
.raw_cluster_cmd('pg', 'dump', '--format=json')
1832 j
= json
.loads('\n'.join(out
.split('\n')[1:]))
1833 return j
['pg_stats']
1835 def get_pgids_to_force(self
, backfill
):
1837 Return the randomized list of PGs that can have their recovery/backfill forced
1839 j
= self
.get_pg_stats();
1842 wanted
= ['degraded', 'backfilling', 'backfill_wait']
1844 wanted
= ['recovering', 'degraded', 'recovery_wait']
1846 status
= pg
['state'].split('+')
1848 if random
.random() > 0.5 and not ('forced_backfill' in status
or 'forced_recovery' in status
) and t
in status
:
1849 pgids
.append(pg
['pgid'])
1853 def get_pgids_to_cancel_force(self
, backfill
):
1855 Return the randomized list of PGs whose recovery/backfill priority is forced
1857 j
= self
.get_pg_stats();
1860 wanted
= 'forced_backfill'
1862 wanted
= 'forced_recovery'
1864 status
= pg
['state'].split('+')
1865 if wanted
in status
and random
.random() > 0.5:
1866 pgids
.append(pg
['pgid'])
1869 def compile_pg_status(self
):
1871 Return a histogram of pg state values
1874 j
= self
.get_pg_stats()
1876 for status
in pg
['state'].split('+'):
1877 if status
not in ret
:
1883 def with_pg_state(self
, pool
, pgnum
, check
):
1884 pgstr
= self
.get_pgid(pool
, pgnum
)
1885 stats
= self
.get_single_pg_stats(pgstr
)
1886 assert(check(stats
['state']))
1889 def with_pg(self
, pool
, pgnum
, check
):
1890 pgstr
= self
.get_pgid(pool
, pgnum
)
1891 stats
= self
.get_single_pg_stats(pgstr
)
1894 def get_last_scrub_stamp(self
, pool
, pgnum
):
1896 Get the timestamp of the last scrub.
1898 stats
= self
.get_single_pg_stats(self
.get_pgid(pool
, pgnum
))
1899 return stats
["last_scrub_stamp"]
1901 def do_pg_scrub(self
, pool
, pgnum
, stype
):
1903 Scrub pg and wait for scrubbing to finish
1905 init
= self
.get_last_scrub_stamp(pool
, pgnum
)
1906 RESEND_TIMEOUT
= 120 # Must be a multiple of SLEEP_TIME
1907 FATAL_TIMEOUT
= RESEND_TIMEOUT
* 3
1910 while init
== self
.get_last_scrub_stamp(pool
, pgnum
):
1911 assert timer
< FATAL_TIMEOUT
, "fatal timeout trying to " + stype
1912 self
.log("waiting for scrub type %s" % (stype
,))
1913 if (timer
% RESEND_TIMEOUT
) == 0:
1914 self
.raw_cluster_cmd('pg', stype
, self
.get_pgid(pool
, pgnum
))
1915 # The first time in this loop is the actual request
1916 if timer
!= 0 and stype
== "repair":
1917 self
.log("WARNING: Resubmitted a non-idempotent repair")
1918 time
.sleep(SLEEP_TIME
)
1921 def wait_snap_trimming_complete(self
, pool
):
1923 Wait for snap trimming on pool to end
1928 poolnum
= self
.get_pool_num(pool
)
1929 poolnumstr
= "%s." % (poolnum
,)
1932 if (now
- start
) > FATAL_TIMEOUT
:
1933 assert (now
- start
) < FATAL_TIMEOUT
, \
1934 'failed to complete snap trimming before timeout'
1935 all_stats
= self
.get_pg_stats()
1937 for pg
in all_stats
:
1938 if (poolnumstr
in pg
['pgid']) and ('snaptrim' in pg
['state']):
1939 self
.log("pg {pg} in trimming, state: {state}".format(
1945 self
.log("{pool} still trimming, waiting".format(pool
=pool
))
1946 time
.sleep(POLL_PERIOD
)
1948 def get_single_pg_stats(self
, pgid
):
1950 Return pg for the pgid specified.
1952 all_stats
= self
.get_pg_stats()
1954 for pg
in all_stats
:
1955 if pg
['pgid'] == pgid
:
1960 def get_object_pg_with_shard(self
, pool
, name
, osdid
):
1963 pool_dump
= self
.get_pool_dump(pool
)
1964 object_map
= self
.get_object_map(pool
, name
)
1965 if pool_dump
["type"] == CephManager
.ERASURE_CODED_POOL
:
1966 shard
= object_map
['acting'].index(osdid
)
1967 return "{pgid}s{shard}".format(pgid
=object_map
['pgid'],
1970 return object_map
['pgid']
1972 def get_object_primary(self
, pool
, name
):
1975 object_map
= self
.get_object_map(pool
, name
)
1976 return object_map
['acting_primary']
1978 def get_object_map(self
, pool
, name
):
1980 osd map --format=json converted to a python object
1981 :returns: the python object
1983 out
= self
.raw_cluster_cmd('--format=json', 'osd', 'map', pool
, name
)
1984 return json
.loads('\n'.join(out
.split('\n')[1:]))
1986 def get_osd_dump_json(self
):
1988 osd dump --format=json converted to a python object
1989 :returns: the python object
1991 out
= self
.raw_cluster_cmd('osd', 'dump', '--format=json')
1992 return json
.loads('\n'.join(out
.split('\n')[1:]))
1994 def get_osd_dump(self
):
1999 return self
.get_osd_dump_json()['osds']
2001 def get_mgr_dump(self
):
2002 out
= self
.raw_cluster_cmd('mgr', 'dump', '--format=json')
2003 return json
.loads(out
)
2005 def get_stuck_pgs(self
, type_
, threshold
):
2007 :returns: stuck pg information from the cluster
2009 out
= self
.raw_cluster_cmd('pg', 'dump_stuck', type_
, str(threshold
),
2011 return json
.loads(out
)
2013 def get_num_unfound_objects(self
):
2015 Check cluster status to get the number of unfound objects
2017 status
= self
.raw_cluster_status()
2019 return status
['pgmap'].get('unfound_objects', 0)
2021 def get_num_creating(self
):
2023 Find the number of pgs in creating mode.
2025 pgs
= self
.get_pg_stats()
2028 if 'creating' in pg
['state']:
2032 def get_num_active_clean(self
):
2034 Find the number of active and clean pgs.
2036 pgs
= self
.get_pg_stats()
2039 if (pg
['state'].count('active') and
2040 pg
['state'].count('clean') and
2041 not pg
['state'].count('stale')):
2045 def get_num_active_recovered(self
):
2047 Find the number of active and recovered pgs.
2049 pgs
= self
.get_pg_stats()
2052 if (pg
['state'].count('active') and
2053 not pg
['state'].count('recover') and
2054 not pg
['state'].count('backfilling') and
2055 not pg
['state'].count('stale')):
2059 def get_is_making_recovery_progress(self
):
2061 Return whether there is recovery progress discernable in the
2064 status
= self
.raw_cluster_status()
2065 kps
= status
['pgmap'].get('recovering_keys_per_sec', 0)
2066 bps
= status
['pgmap'].get('recovering_bytes_per_sec', 0)
2067 ops
= status
['pgmap'].get('recovering_objects_per_sec', 0)
2068 return kps
> 0 or bps
> 0 or ops
> 0
2070 def get_num_active(self
):
2072 Find the number of active pgs.
2074 pgs
= self
.get_pg_stats()
2077 if pg
['state'].count('active') and not pg
['state'].count('stale'):
2081 def get_num_down(self
):
2083 Find the number of pgs that are down.
2085 pgs
= self
.get_pg_stats()
2088 if ((pg
['state'].count('down') and not
2089 pg
['state'].count('stale')) or
2090 (pg
['state'].count('incomplete') and not
2091 pg
['state'].count('stale'))):
2095 def get_num_active_down(self
):
2097 Find the number of pgs that are either active or down.
2099 pgs
= self
.get_pg_stats()
2102 if ((pg
['state'].count('active') and not
2103 pg
['state'].count('stale')) or
2104 (pg
['state'].count('down') and not
2105 pg
['state'].count('stale')) or
2106 (pg
['state'].count('incomplete') and not
2107 pg
['state'].count('stale'))):
2113 True if all pgs are clean
2115 return self
.get_num_active_clean() == self
.get_num_pgs()
2117 def is_recovered(self
):
2119 True if all pgs have recovered
2121 return self
.get_num_active_recovered() == self
.get_num_pgs()
2123 def is_active_or_down(self
):
2125 True if all pgs are active or down
2127 return self
.get_num_active_down() == self
.get_num_pgs()
2129 def wait_for_clean(self
, timeout
=None):
2131 Returns true when all pgs are clean.
2133 self
.log("waiting for clean")
2135 num_active_clean
= self
.get_num_active_clean()
2136 while not self
.is_clean():
2137 if timeout
is not None:
2138 if self
.get_is_making_recovery_progress():
2139 self
.log("making progress, resetting timeout")
2142 self
.log("no progress seen, keeping timeout for now")
2143 if time
.time() - start
>= timeout
:
2144 self
.log('dumping pgs')
2145 out
= self
.raw_cluster_cmd('pg', 'dump')
2147 assert time
.time() - start
< timeout
, \
2148 'failed to become clean before timeout expired'
2149 cur_active_clean
= self
.get_num_active_clean()
2150 if cur_active_clean
!= num_active_clean
:
2152 num_active_clean
= cur_active_clean
2156 def are_all_osds_up(self
):
2158 Returns true if all osds are up.
2160 x
= self
.get_osd_dump()
2161 return (len(x
) == sum([(y
['up'] > 0) for y
in x
]))
2163 def wait_for_all_osds_up(self
, timeout
=None):
2165 When this exits, either the timeout has expired, or all
2168 self
.log("waiting for all up")
2170 while not self
.are_all_osds_up():
2171 if timeout
is not None:
2172 assert time
.time() - start
< timeout
, \
2173 'timeout expired in wait_for_all_osds_up'
2177 def pool_exists(self
, pool
):
2178 if pool
in self
.list_pools():
2182 def wait_for_pool(self
, pool
, timeout
=300):
2184 Wait for a pool to exist
2186 self
.log('waiting for pool %s to exist' % pool
)
2188 while not self
.pool_exists(pool
):
2189 if timeout
is not None:
2190 assert time
.time() - start
< timeout
, \
2191 'timeout expired in wait_for_pool'
2194 def wait_for_pools(self
, pools
):
2196 self
.wait_for_pool(pool
)
2198 def is_mgr_available(self
):
2199 x
= self
.get_mgr_dump()
2200 return x
.get('available', False)
2202 def wait_for_mgr_available(self
, timeout
=None):
2203 self
.log("waiting for mgr available")
2205 while not self
.is_mgr_available():
2206 if timeout
is not None:
2207 assert time
.time() - start
< timeout
, \
2208 'timeout expired in wait_for_mgr_available'
2210 self
.log("mgr available!")
2212 def wait_for_recovery(self
, timeout
=None):
2214 Check peering. When this exists, we have recovered.
2216 self
.log("waiting for recovery to complete")
2218 num_active_recovered
= self
.get_num_active_recovered()
2219 while not self
.is_recovered():
2221 if timeout
is not None:
2222 if self
.get_is_making_recovery_progress():
2223 self
.log("making progress, resetting timeout")
2226 self
.log("no progress seen, keeping timeout for now")
2227 if now
- start
>= timeout
:
2228 if self
.is_recovered():
2230 self
.log('dumping pgs')
2231 out
= self
.raw_cluster_cmd('pg', 'dump')
2233 assert now
- start
< timeout
, \
2234 'failed to recover before timeout expired'
2235 cur_active_recovered
= self
.get_num_active_recovered()
2236 if cur_active_recovered
!= num_active_recovered
:
2238 num_active_recovered
= cur_active_recovered
2240 self
.log("recovered!")
2242 def wait_for_active(self
, timeout
=None):
2244 Check peering. When this exists, we are definitely active
2246 self
.log("waiting for peering to complete")
2248 num_active
= self
.get_num_active()
2249 while not self
.is_active():
2250 if timeout
is not None:
2251 if time
.time() - start
>= timeout
:
2252 self
.log('dumping pgs')
2253 out
= self
.raw_cluster_cmd('pg', 'dump')
2255 assert time
.time() - start
< timeout
, \
2256 'failed to recover before timeout expired'
2257 cur_active
= self
.get_num_active()
2258 if cur_active
!= num_active
:
2260 num_active
= cur_active
2264 def wait_for_active_or_down(self
, timeout
=None):
2266 Check peering. When this exists, we are definitely either
2269 self
.log("waiting for peering to complete or become blocked")
2271 num_active_down
= self
.get_num_active_down()
2272 while not self
.is_active_or_down():
2273 if timeout
is not None:
2274 if time
.time() - start
>= timeout
:
2275 self
.log('dumping pgs')
2276 out
= self
.raw_cluster_cmd('pg', 'dump')
2278 assert time
.time() - start
< timeout
, \
2279 'failed to recover before timeout expired'
2280 cur_active_down
= self
.get_num_active_down()
2281 if cur_active_down
!= num_active_down
:
2283 num_active_down
= cur_active_down
2285 self
.log("active or down!")
2287 def osd_is_up(self
, osd
):
2289 Wrapper for osd check
2291 osds
= self
.get_osd_dump()
2292 return osds
[osd
]['up'] > 0
2294 def wait_till_osd_is_up(self
, osd
, timeout
=None):
2296 Loop waiting for osd.
2298 self
.log('waiting for osd.%d to be up' % osd
)
2300 while not self
.osd_is_up(osd
):
2301 if timeout
is not None:
2302 assert time
.time() - start
< timeout
, \
2303 'osd.%d failed to come up before timeout expired' % osd
2305 self
.log('osd.%d is up' % osd
)
2307 def is_active(self
):
2309 Wrapper to check if all pgs are active
2311 return self
.get_num_active() == self
.get_num_pgs()
2313 def wait_till_active(self
, timeout
=None):
2315 Wait until all pgs are active.
2317 self
.log("waiting till active")
2319 while not self
.is_active():
2320 if timeout
is not None:
2321 if time
.time() - start
>= timeout
:
2322 self
.log('dumping pgs')
2323 out
= self
.raw_cluster_cmd('pg', 'dump')
2325 assert time
.time() - start
< timeout
, \
2326 'failed to become active before timeout expired'
2330 def wait_till_pg_convergence(self
, timeout
=None):
2333 active_osds
= [osd
['osd'] for osd
in self
.get_osd_dump()
2334 if osd
['in'] and osd
['up']]
2336 # strictly speaking, no need to wait for mon. but due to the
2337 # "ms inject socket failures" setting, the osdmap could be delayed,
2338 # so mgr is likely to ignore the pg-stat messages with pgs serving
2339 # newly created pools which is not yet known by mgr. so, to make sure
2340 # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
2342 self
.flush_pg_stats(active_osds
)
2343 new_stats
= dict((stat
['pgid'], stat
['state'])
2344 for stat
in self
.get_pg_stats())
2345 if old_stats
== new_stats
:
2347 if timeout
is not None:
2348 assert time
.time() - start
< timeout
, \
2349 'failed to reach convergence before %d secs' % timeout
2350 old_stats
= new_stats
2351 # longer than mgr_stats_period
2354 def mark_out_osd(self
, osd
):
2356 Wrapper to mark osd out.
2358 self
.raw_cluster_cmd('osd', 'out', str(osd
))
2360 def kill_osd(self
, osd
):
2362 Kill osds by either power cycling (if indicated by the config)
2365 if self
.config
.get('powercycle'):
2366 remote
= self
.find_remote('osd', osd
)
2367 self
.log('kill_osd on osd.{o} '
2368 'doing powercycle of {s}'.format(o
=osd
, s
=remote
.name
))
2369 self
._assert
_ipmi
(remote
)
2370 remote
.console
.power_off()
2371 elif self
.config
.get('bdev_inject_crash') and self
.config
.get('bdev_inject_crash_probability'):
2372 if random
.uniform(0, 1) < self
.config
.get('bdev_inject_crash_probability', .5):
2373 self
.raw_cluster_cmd(
2374 '--', 'tell', 'osd.%d' % osd
,
2376 '--bdev-inject-crash %d' % self
.config
.get('bdev_inject_crash'),
2379 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).wait()
2383 raise RuntimeError('osd.%s did not fail' % osd
)
2385 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2387 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2390 def _assert_ipmi(remote
):
2391 assert remote
.console
.has_ipmi_credentials
, (
2392 "powercycling requested but RemoteConsole is not "
2393 "initialized. Check ipmi config.")
2395 def blackhole_kill_osd(self
, osd
):
2397 Stop osd if nothing else works.
2399 self
.raw_cluster_cmd('--', 'tell', 'osd.%d' % osd
,
2401 '--objectstore-blackhole')
2403 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2405 def revive_osd(self
, osd
, timeout
=360, skip_admin_check
=False):
2407 Revive osds by either power cycling (if indicated by the config)
2410 if self
.config
.get('powercycle'):
2411 remote
= self
.find_remote('osd', osd
)
2412 self
.log('kill_osd on osd.{o} doing powercycle of {s}'.
2413 format(o
=osd
, s
=remote
.name
))
2414 self
._assert
_ipmi
(remote
)
2415 remote
.console
.power_on()
2416 if not remote
.console
.check_status(300):
2417 raise Exception('Failed to revive osd.{o} via ipmi'.
2419 teuthology
.reconnect(self
.ctx
, 60, [remote
])
2420 mount_osd_data(self
.ctx
, remote
, self
.cluster
, str(osd
))
2421 self
.make_admin_daemon_dir(remote
)
2422 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).reset()
2423 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).restart()
2425 if not skip_admin_check
:
2426 # wait for dump_ops_in_flight; this command doesn't appear
2427 # until after the signal handler is installed and it is safe
2428 # to stop the osd again without making valgrind leak checks
2429 # unhappy. see #5924.
2430 self
.wait_run_admin_socket('osd', osd
,
2431 args
=['dump_ops_in_flight'],
2432 timeout
=timeout
, stdout
=DEVNULL
)
2434 def mark_down_osd(self
, osd
):
2436 Cluster command wrapper
2438 self
.raw_cluster_cmd('osd', 'down', str(osd
))
2440 def mark_in_osd(self
, osd
):
2442 Cluster command wrapper
2444 self
.raw_cluster_cmd('osd', 'in', str(osd
))
2446 def signal_osd(self
, osd
, sig
, silent
=False):
2448 Wrapper to local get_daemon call which sends the given
2449 signal to the given osd.
2451 self
.ctx
.daemons
.get_daemon('osd', osd
,
2452 self
.cluster
).signal(sig
, silent
=silent
)
2455 def signal_mon(self
, mon
, sig
, silent
=False):
2457 Wrapper to local get_deamon call
2459 self
.ctx
.daemons
.get_daemon('mon', mon
,
2460 self
.cluster
).signal(sig
, silent
=silent
)
2462 def kill_mon(self
, mon
):
2464 Kill the monitor by either power cycling (if the config says so),
2467 if self
.config
.get('powercycle'):
2468 remote
= self
.find_remote('mon', mon
)
2469 self
.log('kill_mon on mon.{m} doing powercycle of {s}'.
2470 format(m
=mon
, s
=remote
.name
))
2471 self
._assert
_ipmi
(remote
)
2472 remote
.console
.power_off()
2474 self
.ctx
.daemons
.get_daemon('mon', mon
, self
.cluster
).stop()
2476 def revive_mon(self
, mon
):
2478 Restart by either power cycling (if the config says so),
2479 or by doing a normal restart.
2481 if self
.config
.get('powercycle'):
2482 remote
= self
.find_remote('mon', mon
)
2483 self
.log('revive_mon on mon.{m} doing powercycle of {s}'.
2484 format(m
=mon
, s
=remote
.name
))
2485 self
._assert
_ipmi
(remote
)
2486 remote
.console
.power_on()
2487 self
.make_admin_daemon_dir(remote
)
2488 self
.ctx
.daemons
.get_daemon('mon', mon
, self
.cluster
).restart()
2490 def revive_mgr(self
, mgr
):
2492 Restart by either power cycling (if the config says so),
2493 or by doing a normal restart.
2495 if self
.config
.get('powercycle'):
2496 remote
= self
.find_remote('mgr', mgr
)
2497 self
.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
2498 format(m
=mgr
, s
=remote
.name
))
2499 self
._assert
_ipmi
(remote
)
2500 remote
.console
.power_on()
2501 self
.make_admin_daemon_dir(remote
)
2502 self
.ctx
.daemons
.get_daemon('mgr', mgr
, self
.cluster
).restart()
2504 def get_mon_status(self
, mon
):
2506 Extract all the monitor status information from the cluster
2508 addr
= self
.ctx
.ceph
[self
.cluster
].conf
['mon.%s' % mon
]['mon addr']
2509 out
= self
.raw_cluster_cmd('-m', addr
, 'mon_status')
2510 return json
.loads(out
)
2512 def get_mon_quorum(self
):
2514 Extract monitor quorum information from the cluster
2516 out
= self
.raw_cluster_cmd('quorum_status')
2518 self
.log('quorum_status is %s' % out
)
2521 def wait_for_mon_quorum_size(self
, size
, timeout
=300):
2523 Loop until quorum size is reached.
2525 self
.log('waiting for quorum size %d' % size
)
2527 while not len(self
.get_mon_quorum()) == size
:
2528 if timeout
is not None:
2529 assert time
.time() - start
< timeout
, \
2530 ('failed to reach quorum size %d '
2531 'before timeout expired' % size
)
2533 self
.log("quorum is size %d" % size
)
2535 def get_mon_health(self
, debug
=False):
2537 Extract all the monitor health information.
2539 out
= self
.raw_cluster_cmd('health', '--format=json')
2541 self
.log('health:\n{h}'.format(h
=out
))
2542 return json
.loads(out
)
2544 def get_mds_status(self
, mds
):
2546 Run cluster commands for the mds in order to get mds information
2548 out
= self
.raw_cluster_cmd('mds', 'dump', '--format=json')
2549 j
= json
.loads(' '.join(out
.splitlines()[1:]))
2550 # collate; for dup ids, larger gid wins.
2551 for info
in j
['info'].itervalues():
2552 if info
['name'] == mds
:
2556 def get_filepath(self
):
2558 Return path to osd data with {id} needing to be replaced
2560 return '/var/lib/ceph/osd/' + self
.cluster
+ '-{id}'
2562 def make_admin_daemon_dir(self
, remote
):
2564 Create /var/run/ceph directory on remote site.
2567 :param remote: Remote site
2569 remote
.run(args
=['sudo',
2570 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
2573 def utility_task(name
):
2575 Generate ceph_manager subtask corresponding to ceph_manager
2578 def task(ctx
, config
):
2581 args
= config
.get('args', [])
2582 kwargs
= config
.get('kwargs', {})
2583 cluster
= config
.get('cluster', 'ceph')
2584 fn
= getattr(ctx
.managers
[cluster
], name
)
2588 revive_osd
= utility_task("revive_osd")
2589 revive_mon
= utility_task("revive_mon")
2590 kill_osd
= utility_task("kill_osd")
2591 kill_mon
= utility_task("kill_mon")
2592 create_pool
= utility_task("create_pool")
2593 remove_pool
= utility_task("remove_pool")
2594 wait_for_clean
= utility_task("wait_for_clean")
2595 flush_all_pg_stats
= utility_task("flush_all_pg_stats")
2596 set_pool_property
= utility_task("set_pool_property")
2597 do_pg_scrub
= utility_task("do_pg_scrub")
2598 wait_for_pool
= utility_task("wait_for_pool")
2599 wait_for_pools
= utility_task("wait_for_pools")