]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/ceph_manager.py
bump version to 12.2.1-pve3
[ceph.git] / ceph / qa / tasks / ceph_manager.py
CommitLineData
7c673cae
FG
1"""
2ceph manager -- Thrasher and CephManager objects
3"""
4from cStringIO import StringIO
5from functools import wraps
6import contextlib
7import random
8import signal
9import time
10import gevent
11import base64
12import json
13import logging
14import threading
15import traceback
16import os
17from teuthology import misc as teuthology
18from tasks.scrub import Scrubber
19from util.rados import cmd_erasure_code_profile
20from util import get_remote
21from teuthology.contextutil import safe_while
22from teuthology.orchestra.remote import Remote
23from teuthology.orchestra import run
24from teuthology.exceptions import CommandFailedError
25
26try:
27 from subprocess import DEVNULL # py3k
28except ImportError:
29 DEVNULL = open(os.devnull, 'r+')
30
31DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
32
33log = logging.getLogger(__name__)
34
35
36def write_conf(ctx, conf_path=DEFAULT_CONF_PATH, cluster='ceph'):
37 conf_fp = StringIO()
38 ctx.ceph[cluster].conf.write(conf_fp)
39 conf_fp.seek(0)
40 writes = ctx.cluster.run(
41 args=[
42 'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
43 'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
44 'sudo', 'python',
45 '-c',
46 ('import shutil, sys; '
47 'shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))'),
48 conf_path,
49 run.Raw('&&'),
50 'sudo', 'chmod', '0644', conf_path,
51 ],
52 stdin=run.PIPE,
53 wait=False)
54 teuthology.feed_many_stdins_and_close(conf_fp, writes)
55 run.wait(writes)
56
57
58def mount_osd_data(ctx, remote, cluster, osd):
59 """
60 Mount a remote OSD
61
62 :param ctx: Context
63 :param remote: Remote site
64 :param cluster: name of ceph cluster
65 :param osd: Osd name
66 """
67 log.debug('Mounting data for osd.{o} on {r}'.format(o=osd, r=remote))
68 role = "{0}.osd.{1}".format(cluster, osd)
69 alt_role = role if cluster != 'ceph' else "osd.{0}".format(osd)
70 if remote in ctx.disk_config.remote_to_roles_to_dev:
71 if alt_role in ctx.disk_config.remote_to_roles_to_dev[remote]:
72 role = alt_role
73 if role not in ctx.disk_config.remote_to_roles_to_dev[remote]:
74 return
75 dev = ctx.disk_config.remote_to_roles_to_dev[remote][role]
76 mount_options = ctx.disk_config.\
77 remote_to_roles_to_dev_mount_options[remote][role]
78 fstype = ctx.disk_config.remote_to_roles_to_dev_fstype[remote][role]
79 mnt = os.path.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster, osd))
80
81 log.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
82 'mountpoint: {p}, type: {t}, options: {v}'.format(
83 o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options,
84 c=cluster))
85
86 remote.run(
87 args=[
88 'sudo',
89 'mount',
90 '-t', fstype,
91 '-o', ','.join(mount_options),
92 dev,
93 mnt,
94 ]
95 )
96
97
98class Thrasher:
99 """
100 Object used to thrash Ceph
101 """
102 def __init__(self, manager, config, logger=None):
103 self.ceph_manager = manager
104 self.cluster = manager.cluster
105 self.ceph_manager.wait_for_clean()
106 osd_status = self.ceph_manager.get_osd_status()
107 self.in_osds = osd_status['in']
108 self.live_osds = osd_status['live']
109 self.out_osds = osd_status['out']
110 self.dead_osds = osd_status['dead']
111 self.stopping = False
112 self.logger = logger
113 self.config = config
114 self.revive_timeout = self.config.get("revive_timeout", 150)
115 self.pools_to_fix_pgp_num = set()
116 if self.config.get('powercycle'):
117 self.revive_timeout += 120
118 self.clean_wait = self.config.get('clean_wait', 0)
119 self.minin = self.config.get("min_in", 3)
120 self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
121 self.sighup_delay = self.config.get('sighup_delay')
122 self.optrack_toggle_delay = self.config.get('optrack_toggle_delay')
123 self.dump_ops_enable = self.config.get('dump_ops_enable')
124 self.noscrub_toggle_delay = self.config.get('noscrub_toggle_delay')
125 self.chance_thrash_cluster_full = self.config.get('chance_thrash_cluster_full', .05)
126 self.chance_thrash_pg_upmap = self.config.get('chance_thrash_pg_upmap', 1.0)
127 self.chance_thrash_pg_upmap_items = self.config.get('chance_thrash_pg_upmap', 1.0)
224ce89b 128 self.random_eio = self.config.get('random_eio')
c07f9fc5 129 self.chance_force_recovery = self.config.get('chance_force_recovery', 0.3)
7c673cae
FG
130
131 num_osds = self.in_osds + self.out_osds
132 self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * num_osds
133 if self.logger is not None:
134 self.log = lambda x: self.logger.info(x)
135 else:
136 def tmp(x):
137 """
138 Implement log behavior
139 """
140 print x
141 self.log = tmp
142 if self.config is None:
143 self.config = dict()
144 # prevent monitor from auto-marking things out while thrasher runs
145 # try both old and new tell syntax, in case we are testing old code
146 self.saved_options = []
147 # assuming that the default settings do not vary from one daemon to
148 # another
149 first_mon = teuthology.get_first_mon(manager.ctx, self.config).split('.')
150 opts = [('mon', 'mon_osd_down_out_interval', 0)]
151 for service, opt, new_value in opts:
152 old_value = manager.get_config(first_mon[0],
153 first_mon[1],
154 opt)
155 self.saved_options.append((service, opt, old_value))
156 self._set_config(service, '*', opt, new_value)
157 # initialize ceph_objectstore_tool property - must be done before
158 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
159 if (self.config.get('powercycle') or
160 not self.cmd_exists_on_osds("ceph-objectstore-tool") or
161 self.config.get('disable_objectstore_tool_tests', False)):
162 self.ceph_objectstore_tool = False
163 self.test_rm_past_intervals = False
164 if self.config.get('powercycle'):
165 self.log("Unable to test ceph-objectstore-tool, "
166 "powercycle testing")
167 else:
168 self.log("Unable to test ceph-objectstore-tool, "
169 "not available on all OSD nodes")
170 else:
171 self.ceph_objectstore_tool = \
172 self.config.get('ceph_objectstore_tool', True)
173 self.test_rm_past_intervals = \
174 self.config.get('test_rm_past_intervals', True)
175 # spawn do_thrash
176 self.thread = gevent.spawn(self.do_thrash)
177 if self.sighup_delay:
178 self.sighup_thread = gevent.spawn(self.do_sighup)
179 if self.optrack_toggle_delay:
180 self.optrack_toggle_thread = gevent.spawn(self.do_optrack_toggle)
181 if self.dump_ops_enable == "true":
182 self.dump_ops_thread = gevent.spawn(self.do_dump_ops)
183 if self.noscrub_toggle_delay:
184 self.noscrub_toggle_thread = gevent.spawn(self.do_noscrub_toggle)
185
186 def _set_config(self, service_type, service_id, name, value):
187 opt_arg = '--{name} {value}'.format(name=name, value=value)
31f18b77
FG
188 whom = '.'.join([service_type, service_id])
189 self.ceph_manager.raw_cluster_cmd('--', 'tell', whom,
190 'injectargs', opt_arg)
7c673cae
FG
191
192
193 def cmd_exists_on_osds(self, cmd):
194 allremotes = self.ceph_manager.ctx.cluster.only(\
195 teuthology.is_type('osd', self.cluster)).remotes.keys()
196 allremotes = list(set(allremotes))
197 for remote in allremotes:
198 proc = remote.run(args=['type', cmd], wait=True,
199 check_status=False, stdout=StringIO(),
200 stderr=StringIO())
201 if proc.exitstatus != 0:
202 return False;
203 return True;
204
205 def kill_osd(self, osd=None, mark_down=False, mark_out=False):
206 """
207 :param osd: Osd to be killed.
208 :mark_down: Mark down if true.
209 :mark_out: Mark out if true.
210 """
211 if osd is None:
212 osd = random.choice(self.live_osds)
213 self.log("Killing osd %s, live_osds are %s" % (str(osd),
214 str(self.live_osds)))
215 self.live_osds.remove(osd)
216 self.dead_osds.append(osd)
217 self.ceph_manager.kill_osd(osd)
218 if mark_down:
219 self.ceph_manager.mark_down_osd(osd)
220 if mark_out and osd in self.in_osds:
221 self.out_osd(osd)
222 if self.ceph_objectstore_tool:
223 self.log("Testing ceph-objectstore-tool on down osd")
224 remote = self.ceph_manager.find_remote('osd', osd)
225 FSPATH = self.ceph_manager.get_filepath()
226 JPATH = os.path.join(FSPATH, "journal")
227 exp_osd = imp_osd = osd
228 exp_remote = imp_remote = remote
229 # If an older osd is available we'll move a pg from there
230 if (len(self.dead_osds) > 1 and
231 random.random() < self.chance_move_pg):
232 exp_osd = random.choice(self.dead_osds[:-1])
233 exp_remote = self.ceph_manager.find_remote('osd', exp_osd)
234 if ('keyvaluestore_backend' in
235 self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
236 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
237 "--data-path {fpath} --journal-path {jpath} "
238 "--type keyvaluestore "
239 "--log-file="
240 "/var/log/ceph/objectstore_tool.\\$pid.log ".
241 format(fpath=FSPATH, jpath=JPATH))
242 else:
243 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
244 "--data-path {fpath} --journal-path {jpath} "
245 "--log-file="
246 "/var/log/ceph/objectstore_tool.\\$pid.log ".
247 format(fpath=FSPATH, jpath=JPATH))
248 cmd = (prefix + "--op list-pgs").format(id=exp_osd)
249
250 # ceph-objectstore-tool might be temporarily absent during an
251 # upgrade - see http://tracker.ceph.com/issues/18014
252 with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
253 while proceed():
254 proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'],
255 wait=True, check_status=False, stdout=StringIO(),
256 stderr=StringIO())
257 if proc.exitstatus == 0:
258 break
259 log.debug("ceph-objectstore-tool binary not present, trying again")
260
261 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
262 # see http://tracker.ceph.com/issues/19556
263 with safe_while(sleep=15, tries=40, action="ceph-objectstore-tool --op list-pgs") as proceed:
264 while proceed():
265 proc = exp_remote.run(args=cmd, wait=True,
266 check_status=False,
267 stdout=StringIO(), stderr=StringIO())
268 if proc.exitstatus == 0:
269 break
270 elif proc.exitstatus == 1 and proc.stderr == "OSD has the store locked":
271 continue
272 else:
273 raise Exception("ceph-objectstore-tool: "
274 "exp list-pgs failure with status {ret}".
275 format(ret=proc.exitstatus))
276
277 pgs = proc.stdout.getvalue().split('\n')[:-1]
278 if len(pgs) == 0:
279 self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
280 return
281 pg = random.choice(pgs)
282 exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
283 exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
284 exp_path = os.path.join(exp_path,
285 "exp.{pg}.{id}".format(
286 pg=pg,
287 id=exp_osd))
288 # export
289 cmd = prefix + "--op export --pgid {pg} --file {file}"
290 cmd = cmd.format(id=exp_osd, pg=pg, file=exp_path)
291 proc = exp_remote.run(args=cmd)
292 if proc.exitstatus:
293 raise Exception("ceph-objectstore-tool: "
294 "export failure with status {ret}".
295 format(ret=proc.exitstatus))
296 # remove
297 cmd = prefix + "--op remove --pgid {pg}"
298 cmd = cmd.format(id=exp_osd, pg=pg)
299 proc = exp_remote.run(args=cmd)
300 if proc.exitstatus:
301 raise Exception("ceph-objectstore-tool: "
302 "remove failure with status {ret}".
303 format(ret=proc.exitstatus))
304 # If there are at least 2 dead osds we might move the pg
305 if exp_osd != imp_osd:
306 # If pg isn't already on this osd, then we will move it there
307 cmd = (prefix + "--op list-pgs").format(id=imp_osd)
308 proc = imp_remote.run(args=cmd, wait=True,
309 check_status=False, stdout=StringIO())
310 if proc.exitstatus:
311 raise Exception("ceph-objectstore-tool: "
312 "imp list-pgs failure with status {ret}".
313 format(ret=proc.exitstatus))
314 pgs = proc.stdout.getvalue().split('\n')[:-1]
315 if pg not in pgs:
316 self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
317 format(pg=pg, fosd=exp_osd, tosd=imp_osd))
318 if imp_remote != exp_remote:
319 # Copy export file to the other machine
320 self.log("Transfer export file from {srem} to {trem}".
321 format(srem=exp_remote, trem=imp_remote))
322 tmpexport = Remote.get_file(exp_remote, exp_path)
323 Remote.put_file(imp_remote, tmpexport, exp_path)
324 os.remove(tmpexport)
325 else:
326 # Can't move the pg after all
327 imp_osd = exp_osd
328 imp_remote = exp_remote
329 # import
330 cmd = (prefix + "--op import --file {file}")
331 cmd = cmd.format(id=imp_osd, file=exp_path)
332 proc = imp_remote.run(args=cmd, wait=True, check_status=False,
333 stderr=StringIO())
334 if proc.exitstatus == 1:
335 bogosity = "The OSD you are using is older than the exported PG"
336 if bogosity in proc.stderr.getvalue():
337 self.log("OSD older than exported PG"
338 "...ignored")
339 elif proc.exitstatus == 10:
340 self.log("Pool went away before processing an import"
341 "...ignored")
342 elif proc.exitstatus == 11:
343 self.log("Attempt to import an incompatible export"
344 "...ignored")
345 elif proc.exitstatus:
346 raise Exception("ceph-objectstore-tool: "
347 "import failure with status {ret}".
348 format(ret=proc.exitstatus))
349 cmd = "rm -f {file}".format(file=exp_path)
350 exp_remote.run(args=cmd)
351 if imp_remote != exp_remote:
352 imp_remote.run(args=cmd)
353
354 # apply low split settings to each pool
355 for pool in self.ceph_manager.list_pools():
356 no_sudo_prefix = prefix[5:]
357 cmd = ("CEPH_ARGS='--filestore-merge-threshold 1 "
358 "--filestore-split-multiple 1' sudo -E "
359 + no_sudo_prefix + "--op apply-layout-settings --pool " + pool).format(id=osd)
360 proc = remote.run(args=cmd, wait=True, check_status=False, stderr=StringIO())
361 output = proc.stderr.getvalue()
362 if 'Couldn\'t find pool' in output:
363 continue
364 if proc.exitstatus:
365 raise Exception("ceph-objectstore-tool apply-layout-settings"
366 " failed with {status}".format(status=proc.exitstatus))
367
368 def rm_past_intervals(self, osd=None):
369 """
370 :param osd: Osd to find pg to remove past intervals
371 """
372 if self.test_rm_past_intervals:
373 if osd is None:
374 osd = random.choice(self.dead_osds)
375 self.log("Use ceph_objectstore_tool to remove past intervals")
376 remote = self.ceph_manager.find_remote('osd', osd)
377 FSPATH = self.ceph_manager.get_filepath()
378 JPATH = os.path.join(FSPATH, "journal")
379 if ('keyvaluestore_backend' in
380 self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
381 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
382 "--data-path {fpath} --journal-path {jpath} "
383 "--type keyvaluestore "
384 "--log-file="
385 "/var/log/ceph/objectstore_tool.\\$pid.log ".
386 format(fpath=FSPATH, jpath=JPATH))
387 else:
388 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
389 "--data-path {fpath} --journal-path {jpath} "
390 "--log-file="
391 "/var/log/ceph/objectstore_tool.\\$pid.log ".
392 format(fpath=FSPATH, jpath=JPATH))
393 cmd = (prefix + "--op list-pgs").format(id=osd)
394 proc = remote.run(args=cmd, wait=True,
395 check_status=False, stdout=StringIO())
396 if proc.exitstatus:
397 raise Exception("ceph_objectstore_tool: "
398 "exp list-pgs failure with status {ret}".
399 format(ret=proc.exitstatus))
400 pgs = proc.stdout.getvalue().split('\n')[:-1]
401 if len(pgs) == 0:
402 self.log("No PGs found for osd.{osd}".format(osd=osd))
403 return
404 pg = random.choice(pgs)
405 cmd = (prefix + "--op rm-past-intervals --pgid {pg}").\
406 format(id=osd, pg=pg)
407 proc = remote.run(args=cmd)
408 if proc.exitstatus:
409 raise Exception("ceph_objectstore_tool: "
410 "rm-past-intervals failure with status {ret}".
411 format(ret=proc.exitstatus))
412
413 def blackhole_kill_osd(self, osd=None):
414 """
415 If all else fails, kill the osd.
416 :param osd: Osd to be killed.
417 """
418 if osd is None:
419 osd = random.choice(self.live_osds)
420 self.log("Blackholing and then killing osd %s, live_osds are %s" %
421 (str(osd), str(self.live_osds)))
422 self.live_osds.remove(osd)
423 self.dead_osds.append(osd)
424 self.ceph_manager.blackhole_kill_osd(osd)
425
426 def revive_osd(self, osd=None, skip_admin_check=False):
427 """
428 Revive the osd.
429 :param osd: Osd to be revived.
430 """
431 if osd is None:
432 osd = random.choice(self.dead_osds)
433 self.log("Reviving osd %s" % (str(osd),))
434 self.ceph_manager.revive_osd(
435 osd,
436 self.revive_timeout,
437 skip_admin_check=skip_admin_check)
438 self.dead_osds.remove(osd)
439 self.live_osds.append(osd)
224ce89b
WB
440 if self.random_eio > 0 and osd is self.rerrosd:
441 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
442 'injectargs', '--', '--filestore_debug_random_read_err='+str(self.random_eio))
443 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
444 'injectargs', '--', '--bluestore_debug_random_read_err='+str(self.random_eio))
445
7c673cae
FG
446
447 def out_osd(self, osd=None):
448 """
449 Mark the osd out
450 :param osd: Osd to be marked.
451 """
452 if osd is None:
453 osd = random.choice(self.in_osds)
454 self.log("Removing osd %s, in_osds are: %s" %
455 (str(osd), str(self.in_osds)))
456 self.ceph_manager.mark_out_osd(osd)
457 self.in_osds.remove(osd)
458 self.out_osds.append(osd)
459
460 def in_osd(self, osd=None):
461 """
462 Mark the osd out
463 :param osd: Osd to be marked.
464 """
465 if osd is None:
466 osd = random.choice(self.out_osds)
467 if osd in self.dead_osds:
468 return self.revive_osd(osd)
469 self.log("Adding osd %s" % (str(osd),))
470 self.out_osds.remove(osd)
471 self.in_osds.append(osd)
472 self.ceph_manager.mark_in_osd(osd)
473 self.log("Added osd %s" % (str(osd),))
474
475 def reweight_osd_or_by_util(self, osd=None):
476 """
477 Reweight an osd that is in
478 :param osd: Osd to be marked.
479 """
480 if osd is not None or random.choice([True, False]):
481 if osd is None:
482 osd = random.choice(self.in_osds)
483 val = random.uniform(.1, 1.0)
484 self.log("Reweighting osd %s to %s" % (str(osd), str(val)))
485 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
486 str(osd), str(val))
487 else:
488 # do it several times, the option space is large
489 for i in range(5):
490 options = {
491 'max_change': random.choice(['0.05', '1.0', '3.0']),
492 'overage': random.choice(['110', '1000']),
493 'type': random.choice([
494 'reweight-by-utilization',
495 'test-reweight-by-utilization']),
496 }
497 self.log("Reweighting by: %s"%(str(options),))
498 self.ceph_manager.raw_cluster_cmd(
499 'osd',
500 options['type'],
501 options['overage'],
502 options['max_change'])
503
504 def primary_affinity(self, osd=None):
505 if osd is None:
506 osd = random.choice(self.in_osds)
507 if random.random() >= .5:
508 pa = random.random()
509 elif random.random() >= .5:
510 pa = 1
511 else:
512 pa = 0
513 self.log('Setting osd %s primary_affinity to %f' % (str(osd), pa))
514 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
515 str(osd), str(pa))
516
517 def thrash_cluster_full(self):
518 """
519 Set and unset cluster full condition
520 """
521 self.log('Setting full ratio to .001')
522 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
523 time.sleep(1)
524 self.log('Setting full ratio back to .95')
525 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
526
527 def thrash_pg_upmap(self):
528 """
529 Install or remove random pg_upmap entries in OSDMap
530 """
531 from random import shuffle
532 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
533 j = json.loads(out)
534 self.log('j is %s' % j)
535 try:
536 if random.random() >= .3:
537 pgs = self.ceph_manager.get_pg_stats()
538 pg = random.choice(pgs)
539 pgid = str(pg['pgid'])
540 poolid = int(pgid.split('.')[0])
541 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
542 if len(sizes) == 0:
543 return
544 n = sizes[0]
545 osds = self.in_osds + self.out_osds
546 shuffle(osds)
547 osds = osds[0:n]
548 self.log('Setting %s to %s' % (pgid, osds))
549 cmd = ['osd', 'pg-upmap', pgid] + [str(x) for x in osds]
550 self.log('cmd %s' % cmd)
551 self.ceph_manager.raw_cluster_cmd(*cmd)
552 else:
553 m = j['pg_upmap']
554 if len(m) > 0:
555 shuffle(m)
556 pg = m[0]['pgid']
557 self.log('Clearing pg_upmap on %s' % pg)
558 self.ceph_manager.raw_cluster_cmd(
559 'osd',
560 'rm-pg-upmap',
561 pg)
562 else:
563 self.log('No pg_upmap entries; doing nothing')
564 except CommandFailedError:
565 self.log('Failed to rm-pg-upmap, ignoring')
566
567 def thrash_pg_upmap_items(self):
568 """
569 Install or remove random pg_upmap_items entries in OSDMap
570 """
571 from random import shuffle
572 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
573 j = json.loads(out)
574 self.log('j is %s' % j)
575 try:
576 if random.random() >= .3:
577 pgs = self.ceph_manager.get_pg_stats()
578 pg = random.choice(pgs)
579 pgid = str(pg['pgid'])
580 poolid = int(pgid.split('.')[0])
581 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
582 if len(sizes) == 0:
583 return
584 n = sizes[0]
585 osds = self.in_osds + self.out_osds
586 shuffle(osds)
587 osds = osds[0:n*2]
588 self.log('Setting %s to %s' % (pgid, osds))
589 cmd = ['osd', 'pg-upmap-items', pgid] + [str(x) for x in osds]
590 self.log('cmd %s' % cmd)
591 self.ceph_manager.raw_cluster_cmd(*cmd)
592 else:
593 m = j['pg_upmap_items']
594 if len(m) > 0:
595 shuffle(m)
596 pg = m[0]['pgid']
597 self.log('Clearing pg_upmap on %s' % pg)
598 self.ceph_manager.raw_cluster_cmd(
599 'osd',
600 'rm-pg-upmap-items',
601 pg)
602 else:
603 self.log('No pg_upmap entries; doing nothing')
604 except CommandFailedError:
605 self.log('Failed to rm-pg-upmap-items, ignoring')
606
c07f9fc5
FG
607 def force_recovery(self):
608 """
609 Force recovery on some of PGs
610 """
611 backfill = random.random() >= 0.5
612 j = self.ceph_manager.get_pgids_to_force(backfill)
613 if j:
614 if backfill:
615 self.ceph_manager.raw_cluster_cmd('pg', 'force-backfill', *j)
616 else:
617 self.ceph_manager.raw_cluster_cmd('pg', 'force-recovery', *j)
618
619 def cancel_force_recovery(self):
620 """
621 Force recovery on some of PGs
622 """
623 backfill = random.random() >= 0.5
624 j = self.ceph_manager.get_pgids_to_cancel_force(backfill)
625 if j:
626 if backfill:
627 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-backfill', *j)
628 else:
629 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-recovery', *j)
630
631 def force_cancel_recovery(self):
632 """
633 Force or cancel forcing recovery
634 """
635 if random.random() >= 0.4:
636 self.force_recovery()
637 else:
638 self.cancel_force_recovery()
639
7c673cae
FG
640 def all_up(self):
641 """
642 Make sure all osds are up and not out.
643 """
644 while len(self.dead_osds) > 0:
645 self.log("reviving osd")
646 self.revive_osd()
647 while len(self.out_osds) > 0:
648 self.log("inning osd")
649 self.in_osd()
650
31f18b77
FG
651 def all_up_in(self):
652 """
653 Make sure all osds are up and fully in.
654 """
655 self.all_up();
656 for osd in self.live_osds:
657 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
658 str(osd), str(1))
659 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
660 str(osd), str(1))
661
7c673cae
FG
662 def do_join(self):
663 """
664 Break out of this Ceph loop
665 """
666 self.stopping = True
667 self.thread.get()
668 if self.sighup_delay:
669 self.log("joining the do_sighup greenlet")
670 self.sighup_thread.get()
671 if self.optrack_toggle_delay:
672 self.log("joining the do_optrack_toggle greenlet")
673 self.optrack_toggle_thread.join()
674 if self.dump_ops_enable == "true":
675 self.log("joining the do_dump_ops greenlet")
676 self.dump_ops_thread.join()
677 if self.noscrub_toggle_delay:
678 self.log("joining the do_noscrub_toggle greenlet")
679 self.noscrub_toggle_thread.join()
680
681 def grow_pool(self):
682 """
683 Increase the size of the pool
684 """
685 pool = self.ceph_manager.get_pool()
686 orig_pg_num = self.ceph_manager.get_pool_pg_num(pool)
687 self.log("Growing pool %s" % (pool,))
688 if self.ceph_manager.expand_pool(pool,
689 self.config.get('pool_grow_by', 10),
690 self.max_pgs):
691 self.pools_to_fix_pgp_num.add(pool)
692
693 def fix_pgp_num(self, pool=None):
694 """
695 Fix number of pgs in pool.
696 """
697 if pool is None:
698 pool = self.ceph_manager.get_pool()
699 force = False
700 else:
701 force = True
702 self.log("fixing pg num pool %s" % (pool,))
703 if self.ceph_manager.set_pool_pgpnum(pool, force):
704 self.pools_to_fix_pgp_num.discard(pool)
705
706 def test_pool_min_size(self):
707 """
708 Kill and revive all osds except one.
709 """
710 self.log("test_pool_min_size")
711 self.all_up()
712 self.ceph_manager.wait_for_recovery(
713 timeout=self.config.get('timeout')
714 )
715 the_one = random.choice(self.in_osds)
716 self.log("Killing everyone but %s", the_one)
717 to_kill = filter(lambda x: x != the_one, self.in_osds)
718 [self.kill_osd(i) for i in to_kill]
719 [self.out_osd(i) for i in to_kill]
720 time.sleep(self.config.get("test_pool_min_size_time", 10))
721 self.log("Killing %s" % (the_one,))
722 self.kill_osd(the_one)
723 self.out_osd(the_one)
724 self.log("Reviving everyone but %s" % (the_one,))
725 [self.revive_osd(i) for i in to_kill]
726 [self.in_osd(i) for i in to_kill]
727 self.log("Revived everyone but %s" % (the_one,))
728 self.log("Waiting for clean")
729 self.ceph_manager.wait_for_recovery(
730 timeout=self.config.get('timeout')
731 )
732
733 def inject_pause(self, conf_key, duration, check_after, should_be_down):
734 """
735 Pause injection testing. Check for osd being down when finished.
736 """
737 the_one = random.choice(self.live_osds)
738 self.log("inject_pause on {osd}".format(osd=the_one))
739 self.log(
740 "Testing {key} pause injection for duration {duration}".format(
741 key=conf_key,
742 duration=duration
743 ))
744 self.log(
745 "Checking after {after}, should_be_down={shouldbedown}".format(
746 after=check_after,
747 shouldbedown=should_be_down
748 ))
749 self.ceph_manager.set_config(the_one, **{conf_key: duration})
750 if not should_be_down:
751 return
752 time.sleep(check_after)
753 status = self.ceph_manager.get_osd_status()
754 assert the_one in status['down']
755 time.sleep(duration - check_after + 20)
756 status = self.ceph_manager.get_osd_status()
757 assert not the_one in status['down']
758
759 def test_backfill_full(self):
760 """
761 Test backfills stopping when the replica fills up.
762
763 First, use injectfull admin command to simulate a now full
764 osd by setting it to 0 on all of the OSDs.
765
766 Second, on a random subset, set
767 osd_debug_skip_full_check_in_backfill_reservation to force
768 the more complicated check in do_scan to be exercised.
769
770 Then, verify that all backfills stop.
771 """
772 self.log("injecting backfill full")
773 for i in self.live_osds:
774 self.ceph_manager.set_config(
775 i,
776 osd_debug_skip_full_check_in_backfill_reservation=
777 random.choice(['false', 'true']))
778 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'backfillfull'],
779 check_status=True, timeout=30, stdout=DEVNULL)
780 for i in range(30):
781 status = self.ceph_manager.compile_pg_status()
782 if 'backfill' not in status.keys():
783 break
784 self.log(
785 "waiting for {still_going} backfills".format(
786 still_going=status.get('backfill')))
787 time.sleep(1)
788 assert('backfill' not in self.ceph_manager.compile_pg_status().keys())
789 for i in self.live_osds:
790 self.ceph_manager.set_config(
791 i,
792 osd_debug_skip_full_check_in_backfill_reservation='false')
793 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'none'],
794 check_status=True, timeout=30, stdout=DEVNULL)
795
796 def test_map_discontinuity(self):
797 """
798 1) Allows the osds to recover
799 2) kills an osd
800 3) allows the remaining osds to recover
801 4) waits for some time
802 5) revives the osd
803 This sequence should cause the revived osd to have to handle
804 a map gap since the mons would have trimmed
805 """
806 while len(self.in_osds) < (self.minin + 1):
807 self.in_osd()
808 self.log("Waiting for recovery")
c07f9fc5 809 self.ceph_manager.wait_for_all_osds_up(
7c673cae
FG
810 timeout=self.config.get('timeout')
811 )
812 # now we wait 20s for the pg status to change, if it takes longer,
813 # the test *should* fail!
814 time.sleep(20)
815 self.ceph_manager.wait_for_clean(
816 timeout=self.config.get('timeout')
817 )
818
819 # now we wait 20s for the backfill replicas to hear about the clean
820 time.sleep(20)
821 self.log("Recovered, killing an osd")
822 self.kill_osd(mark_down=True, mark_out=True)
823 self.log("Waiting for clean again")
824 self.ceph_manager.wait_for_clean(
825 timeout=self.config.get('timeout')
826 )
827 self.log("Waiting for trim")
828 time.sleep(int(self.config.get("map_discontinuity_sleep_time", 40)))
829 self.revive_osd()
830
831 def choose_action(self):
832 """
833 Random action selector.
834 """
835 chance_down = self.config.get('chance_down', 0.4)
836 chance_test_min_size = self.config.get('chance_test_min_size', 0)
837 chance_test_backfill_full = \
838 self.config.get('chance_test_backfill_full', 0)
839 if isinstance(chance_down, int):
840 chance_down = float(chance_down) / 100
841 minin = self.minin
842 minout = self.config.get("min_out", 0)
843 minlive = self.config.get("min_live", 2)
844 mindead = self.config.get("min_dead", 0)
845
846 self.log('choose_action: min_in %d min_out '
847 '%d min_live %d min_dead %d' %
848 (minin, minout, minlive, mindead))
849 actions = []
850 if len(self.in_osds) > minin:
851 actions.append((self.out_osd, 1.0,))
852 if len(self.live_osds) > minlive and chance_down > 0:
853 actions.append((self.kill_osd, chance_down,))
854 if len(self.dead_osds) > 1:
855 actions.append((self.rm_past_intervals, 1.0,))
856 if len(self.out_osds) > minout:
857 actions.append((self.in_osd, 1.7,))
858 if len(self.dead_osds) > mindead:
859 actions.append((self.revive_osd, 1.0,))
860 if self.config.get('thrash_primary_affinity', True):
861 actions.append((self.primary_affinity, 1.0,))
862 actions.append((self.reweight_osd_or_by_util,
863 self.config.get('reweight_osd', .5),))
864 actions.append((self.grow_pool,
865 self.config.get('chance_pgnum_grow', 0),))
866 actions.append((self.fix_pgp_num,
867 self.config.get('chance_pgpnum_fix', 0),))
868 actions.append((self.test_pool_min_size,
869 chance_test_min_size,))
870 actions.append((self.test_backfill_full,
871 chance_test_backfill_full,))
872 if self.chance_thrash_cluster_full > 0:
873 actions.append((self.thrash_cluster_full, self.chance_thrash_cluster_full,))
874 if self.chance_thrash_pg_upmap > 0:
875 actions.append((self.thrash_pg_upmap, self.chance_thrash_pg_upmap,))
876 if self.chance_thrash_pg_upmap_items > 0:
877 actions.append((self.thrash_pg_upmap_items, self.chance_thrash_pg_upmap_items,))
c07f9fc5
FG
878 if self.chance_force_recovery > 0:
879 actions.append((self.force_cancel_recovery, self.chance_force_recovery))
7c673cae
FG
880
881 for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
882 for scenario in [
883 (lambda:
884 self.inject_pause(key,
885 self.config.get('pause_short', 3),
886 0,
887 False),
888 self.config.get('chance_inject_pause_short', 1),),
889 (lambda:
890 self.inject_pause(key,
891 self.config.get('pause_long', 80),
892 self.config.get('pause_check_after', 70),
893 True),
894 self.config.get('chance_inject_pause_long', 0),)]:
895 actions.append(scenario)
896
897 total = sum([y for (x, y) in actions])
898 val = random.uniform(0, total)
899 for (action, prob) in actions:
900 if val < prob:
901 return action
902 val -= prob
903 return None
904
905 def log_exc(func):
906 @wraps(func)
907 def wrapper(self):
908 try:
909 return func(self)
910 except:
911 self.log(traceback.format_exc())
912 raise
913 return wrapper
914
915 @log_exc
916 def do_sighup(self):
917 """
918 Loops and sends signal.SIGHUP to a random live osd.
919
920 Loop delay is controlled by the config value sighup_delay.
921 """
922 delay = float(self.sighup_delay)
923 self.log("starting do_sighup with a delay of {0}".format(delay))
924 while not self.stopping:
925 osd = random.choice(self.live_osds)
926 self.ceph_manager.signal_osd(osd, signal.SIGHUP, silent=True)
927 time.sleep(delay)
928
929 @log_exc
930 def do_optrack_toggle(self):
931 """
932 Loops and toggle op tracking to all osds.
933
934 Loop delay is controlled by the config value optrack_toggle_delay.
935 """
936 delay = float(self.optrack_toggle_delay)
937 osd_state = "true"
938 self.log("starting do_optrack_toggle with a delay of {0}".format(delay))
939 while not self.stopping:
940 if osd_state == "true":
941 osd_state = "false"
942 else:
943 osd_state = "true"
944 self.ceph_manager.raw_cluster_cmd_result('tell', 'osd.*',
945 'injectargs', '--osd_enable_op_tracker=%s' % osd_state)
946 gevent.sleep(delay)
947
948 @log_exc
949 def do_dump_ops(self):
950 """
951 Loops and does op dumps on all osds
952 """
953 self.log("starting do_dump_ops")
954 while not self.stopping:
955 for osd in self.live_osds:
956 # Ignore errors because live_osds is in flux
957 self.ceph_manager.osd_admin_socket(osd, command=['dump_ops_in_flight'],
958 check_status=False, timeout=30, stdout=DEVNULL)
959 self.ceph_manager.osd_admin_socket(osd, command=['dump_blocked_ops'],
960 check_status=False, timeout=30, stdout=DEVNULL)
961 self.ceph_manager.osd_admin_socket(osd, command=['dump_historic_ops'],
962 check_status=False, timeout=30, stdout=DEVNULL)
963 gevent.sleep(0)
964
965 @log_exc
966 def do_noscrub_toggle(self):
967 """
968 Loops and toggle noscrub flags
969
970 Loop delay is controlled by the config value noscrub_toggle_delay.
971 """
972 delay = float(self.noscrub_toggle_delay)
973 scrub_state = "none"
974 self.log("starting do_noscrub_toggle with a delay of {0}".format(delay))
975 while not self.stopping:
976 if scrub_state == "none":
977 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'noscrub')
978 scrub_state = "noscrub"
979 elif scrub_state == "noscrub":
980 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
981 scrub_state = "both"
982 elif scrub_state == "both":
983 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
984 scrub_state = "nodeep-scrub"
985 else:
986 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
987 scrub_state = "none"
988 gevent.sleep(delay)
989 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
990 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
991
992 @log_exc
993 def do_thrash(self):
994 """
995 Loop to select random actions to thrash ceph manager with.
996 """
997 cleanint = self.config.get("clean_interval", 60)
998 scrubint = self.config.get("scrub_interval", -1)
999 maxdead = self.config.get("max_dead", 0)
1000 delay = self.config.get("op_delay", 5)
224ce89b
WB
1001 self.rerrosd = self.live_osds[0]
1002 if self.random_eio > 0:
1003 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1004 'injectargs', '--', '--filestore_debug_random_read_err='+str(self.random_eio))
1005 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1006 'injectargs', '--', '--bluestore_debug_random_read_err='+str(self.random_eio))
7c673cae
FG
1007 self.log("starting do_thrash")
1008 while not self.stopping:
1009 to_log = [str(x) for x in ["in_osds: ", self.in_osds,
1010 "out_osds: ", self.out_osds,
1011 "dead_osds: ", self.dead_osds,
1012 "live_osds: ", self.live_osds]]
1013 self.log(" ".join(to_log))
1014 if random.uniform(0, 1) < (float(delay) / cleanint):
1015 while len(self.dead_osds) > maxdead:
1016 self.revive_osd()
1017 for osd in self.in_osds:
1018 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
1019 str(osd), str(1))
1020 if random.uniform(0, 1) < float(
1021 self.config.get('chance_test_map_discontinuity', 0)):
1022 self.test_map_discontinuity()
1023 else:
1024 self.ceph_manager.wait_for_recovery(
1025 timeout=self.config.get('timeout')
1026 )
1027 time.sleep(self.clean_wait)
1028 if scrubint > 0:
1029 if random.uniform(0, 1) < (float(delay) / scrubint):
1030 self.log('Scrubbing while thrashing being performed')
1031 Scrubber(self.ceph_manager, self.config)
1032 self.choose_action()()
1033 time.sleep(delay)
181888fb 1034 self.all_up()
224ce89b
WB
1035 if self.random_eio > 0:
1036 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1037 'injectargs', '--', '--filestore_debug_random_read_err=0.0')
1038 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1039 'injectargs', '--', '--bluestore_debug_random_read_err=0.0')
7c673cae
FG
1040 for pool in list(self.pools_to_fix_pgp_num):
1041 if self.ceph_manager.get_pool_pg_num(pool) > 0:
1042 self.fix_pgp_num(pool)
1043 self.pools_to_fix_pgp_num.clear()
1044 for service, opt, saved_value in self.saved_options:
1045 self._set_config(service, '*', opt, saved_value)
1046 self.saved_options = []
31f18b77 1047 self.all_up_in()
7c673cae
FG
1048
1049
1050class ObjectStoreTool:
1051
1052 def __init__(self, manager, pool, **kwargs):
1053 self.manager = manager
1054 self.pool = pool
1055 self.osd = kwargs.get('osd', None)
1056 self.object_name = kwargs.get('object_name', None)
1057 self.do_revive = kwargs.get('do_revive', True)
1058 if self.osd and self.pool and self.object_name:
1059 if self.osd == "primary":
1060 self.osd = self.manager.get_object_primary(self.pool,
1061 self.object_name)
1062 assert self.osd
1063 if self.object_name:
1064 self.pgid = self.manager.get_object_pg_with_shard(self.pool,
1065 self.object_name,
1066 self.osd)
1067 self.remote = self.manager.ctx.\
1068 cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()[0]
1069 path = self.manager.get_filepath().format(id=self.osd)
1070 self.paths = ("--data-path {path} --journal-path {path}/journal".
1071 format(path=path))
1072
1073 def build_cmd(self, options, args, stdin):
1074 lines = []
1075 if self.object_name:
1076 lines.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1077 "{paths} --pgid {pgid} --op list |"
1078 "grep '\"oid\":\"{name}\"')".
1079 format(paths=self.paths,
1080 pgid=self.pgid,
1081 name=self.object_name))
1082 args = '"$object" ' + args
1083 options += " --pgid {pgid}".format(pgid=self.pgid)
1084 cmd = ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1085 format(paths=self.paths,
1086 args=args,
1087 options=options))
1088 if stdin:
1089 cmd = ("echo {payload} | base64 --decode | {cmd}".
1090 format(payload=base64.encode(stdin),
1091 cmd=cmd))
1092 lines.append(cmd)
1093 return "\n".join(lines)
1094
1095 def run(self, options, args, stdin=None, stdout=None):
1096 if stdout is None:
1097 stdout = StringIO()
1098 self.manager.kill_osd(self.osd)
1099 cmd = self.build_cmd(options, args, stdin)
1100 self.manager.log(cmd)
1101 try:
1102 proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
1103 check_status=False,
1104 stdout=stdout,
1105 stderr=StringIO())
1106 proc.wait()
1107 if proc.exitstatus != 0:
1108 self.manager.log("failed with " + str(proc.exitstatus))
1109 error = proc.stdout.getvalue() + " " + proc.stderr.getvalue()
1110 raise Exception(error)
1111 finally:
1112 if self.do_revive:
1113 self.manager.revive_osd(self.osd)
c07f9fc5 1114 self.manager.wait_till_osd_is_up(self.osd, 300)
7c673cae
FG
1115
1116
1117class CephManager:
1118 """
1119 Ceph manager object.
1120 Contains several local functions that form a bulk of this module.
1121
1122 Note: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1123 the same name.
1124 """
1125
1126 REPLICATED_POOL = 1
1127 ERASURE_CODED_POOL = 3
1128
1129 def __init__(self, controller, ctx=None, config=None, logger=None,
1130 cluster='ceph'):
1131 self.lock = threading.RLock()
1132 self.ctx = ctx
1133 self.config = config
1134 self.controller = controller
1135 self.next_pool_id = 0
1136 self.cluster = cluster
1137 if (logger):
1138 self.log = lambda x: logger.info(x)
1139 else:
1140 def tmp(x):
1141 """
1142 implement log behavior.
1143 """
1144 print x
1145 self.log = tmp
1146 if self.config is None:
1147 self.config = dict()
1148 pools = self.list_pools()
1149 self.pools = {}
1150 for pool in pools:
1151 # we may race with a pool deletion; ignore failures here
1152 try:
1153 self.pools[pool] = self.get_pool_property(pool, 'pg_num')
1154 except CommandFailedError:
1155 self.log('Failed to get pg_num from pool %s, ignoring' % pool)
1156
1157 def raw_cluster_cmd(self, *args):
1158 """
1159 Start ceph on a raw cluster. Return count
1160 """
1161 testdir = teuthology.get_testdir(self.ctx)
1162 ceph_args = [
1163 'sudo',
1164 'adjust-ulimits',
1165 'ceph-coverage',
1166 '{tdir}/archive/coverage'.format(tdir=testdir),
1167 'timeout',
1168 '120',
1169 'ceph',
1170 '--cluster',
1171 self.cluster,
1172 ]
1173 ceph_args.extend(args)
1174 proc = self.controller.run(
1175 args=ceph_args,
1176 stdout=StringIO(),
1177 )
1178 return proc.stdout.getvalue()
1179
1180 def raw_cluster_cmd_result(self, *args):
1181 """
1182 Start ceph on a cluster. Return success or failure information.
1183 """
1184 testdir = teuthology.get_testdir(self.ctx)
1185 ceph_args = [
1186 'sudo',
1187 'adjust-ulimits',
1188 'ceph-coverage',
1189 '{tdir}/archive/coverage'.format(tdir=testdir),
1190 'timeout',
1191 '120',
1192 'ceph',
1193 '--cluster',
1194 self.cluster,
1195 ]
1196 ceph_args.extend(args)
1197 proc = self.controller.run(
1198 args=ceph_args,
1199 check_status=False,
1200 )
1201 return proc.exitstatus
1202
1203 def run_ceph_w(self):
1204 """
1205 Execute "ceph -w" in the background with stdout connected to a StringIO,
1206 and return the RemoteProcess.
1207 """
1208 return self.controller.run(
1209 args=["sudo",
1210 "daemon-helper",
1211 "kill",
1212 "ceph",
1213 '--cluster',
1214 self.cluster,
1215 "-w"],
1216 wait=False, stdout=StringIO(), stdin=run.PIPE)
1217
224ce89b 1218 def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
31f18b77
FG
1219 """
1220 Flush pg stats from a list of OSD ids, ensuring they are reflected
1221 all the way to the monitor. Luminous and later only.
1222
1223 :param osds: list of OSDs to flush
1224 :param no_wait: list of OSDs not to wait for seq id. by default, we
1225 wait for all specified osds, but some of them could be
1226 moved out of osdmap, so we cannot get their updated
1227 stat seq from monitor anymore. in that case, you need
1228 to pass a blacklist.
1229 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
224ce89b 1230 it. (5 min by default)
31f18b77
FG
1231 """
1232 seq = {osd: self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats')
1233 for osd in osds}
1234 if not wait_for_mon:
1235 return
1236 if no_wait is None:
1237 no_wait = []
1238 for osd, need in seq.iteritems():
1239 if osd in no_wait:
1240 continue
1241 got = 0
1242 while wait_for_mon > 0:
1243 got = self.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd)
1244 self.log('need seq {need} got {got} for osd.{osd}'.format(
1245 need=need, got=got, osd=osd))
1246 if got >= need:
1247 break
1248 A_WHILE = 1
1249 time.sleep(A_WHILE)
1250 wait_for_mon -= A_WHILE
1251 else:
1252 raise Exception('timed out waiting for mon to be updated with '
1253 'osd.{osd}: {got} < {need}'.
1254 format(osd=osd, got=got, need=need))
1255
1256 def flush_all_pg_stats(self):
1257 self.flush_pg_stats(range(len(self.get_osd_dump())))
1258
7c673cae
FG
1259 def do_rados(self, remote, cmd, check_status=True):
1260 """
1261 Execute a remote rados command.
1262 """
1263 testdir = teuthology.get_testdir(self.ctx)
1264 pre = [
1265 'adjust-ulimits',
1266 'ceph-coverage',
1267 '{tdir}/archive/coverage'.format(tdir=testdir),
1268 'rados',
1269 '--cluster',
1270 self.cluster,
1271 ]
1272 pre.extend(cmd)
1273 proc = remote.run(
1274 args=pre,
1275 wait=True,
1276 check_status=check_status
1277 )
1278 return proc
1279
1280 def rados_write_objects(self, pool, num_objects, size,
1281 timelimit, threads, cleanup=False):
1282 """
1283 Write rados objects
1284 Threads not used yet.
1285 """
1286 args = [
1287 '-p', pool,
1288 '--num-objects', num_objects,
1289 '-b', size,
1290 'bench', timelimit,
1291 'write'
1292 ]
1293 if not cleanup:
1294 args.append('--no-cleanup')
1295 return self.do_rados(self.controller, map(str, args))
1296
1297 def do_put(self, pool, obj, fname, namespace=None):
1298 """
1299 Implement rados put operation
1300 """
1301 args = ['-p', pool]
1302 if namespace is not None:
1303 args += ['-N', namespace]
1304 args += [
1305 'put',
1306 obj,
1307 fname
1308 ]
1309 return self.do_rados(
1310 self.controller,
1311 args,
1312 check_status=False
1313 ).exitstatus
1314
1315 def do_get(self, pool, obj, fname='/dev/null', namespace=None):
1316 """
1317 Implement rados get operation
1318 """
1319 args = ['-p', pool]
1320 if namespace is not None:
1321 args += ['-N', namespace]
1322 args += [
1323 'get',
1324 obj,
1325 fname
1326 ]
1327 return self.do_rados(
1328 self.controller,
1329 args,
1330 check_status=False
1331 ).exitstatus
1332
1333 def do_rm(self, pool, obj, namespace=None):
1334 """
1335 Implement rados rm operation
1336 """
1337 args = ['-p', pool]
1338 if namespace is not None:
1339 args += ['-N', namespace]
1340 args += [
1341 'rm',
1342 obj
1343 ]
1344 return self.do_rados(
1345 self.controller,
1346 args,
1347 check_status=False
1348 ).exitstatus
1349
1350 def osd_admin_socket(self, osd_id, command, check_status=True, timeout=0, stdout=None):
1351 if stdout is None:
1352 stdout = StringIO()
1353 return self.admin_socket('osd', osd_id, command, check_status, timeout, stdout)
1354
1355 def find_remote(self, service_type, service_id):
1356 """
1357 Get the Remote for the host where a particular service runs.
1358
1359 :param service_type: 'mds', 'osd', 'client'
1360 :param service_id: The second part of a role, e.g. '0' for
1361 the role 'client.0'
1362 :return: a Remote instance for the host where the
1363 requested role is placed
1364 """
1365 return get_remote(self.ctx, self.cluster,
1366 service_type, service_id)
1367
1368 def admin_socket(self, service_type, service_id,
1369 command, check_status=True, timeout=0, stdout=None):
1370 """
1371 Remotely start up ceph specifying the admin socket
1372 :param command: a list of words to use as the command
1373 to the admin socket
1374 """
1375 if stdout is None:
1376 stdout = StringIO()
1377 testdir = teuthology.get_testdir(self.ctx)
1378 remote = self.find_remote(service_type, service_id)
1379 args = [
1380 'sudo',
1381 'adjust-ulimits',
1382 'ceph-coverage',
1383 '{tdir}/archive/coverage'.format(tdir=testdir),
1384 'timeout',
1385 str(timeout),
1386 'ceph',
1387 '--cluster',
1388 self.cluster,
1389 '--admin-daemon',
1390 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1391 cluster=self.cluster,
1392 type=service_type,
1393 id=service_id),
1394 ]
1395 args.extend(command)
1396 return remote.run(
1397 args=args,
1398 stdout=stdout,
1399 wait=True,
1400 check_status=check_status
1401 )
1402
1403 def objectstore_tool(self, pool, options, args, **kwargs):
1404 return ObjectStoreTool(self, pool, **kwargs).run(options, args)
1405
1406 def get_pgid(self, pool, pgnum):
1407 """
1408 :param pool: pool name
1409 :param pgnum: pg number
1410 :returns: a string representing this pg.
1411 """
1412 poolnum = self.get_pool_num(pool)
1413 pg_str = "{poolnum}.{pgnum}".format(
1414 poolnum=poolnum,
1415 pgnum=pgnum)
1416 return pg_str
1417
1418 def get_pg_replica(self, pool, pgnum):
1419 """
1420 get replica for pool, pgnum (e.g. (data, 0)->0
1421 """
1422 pg_str = self.get_pgid(pool, pgnum)
1423 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1424 j = json.loads('\n'.join(output.split('\n')[1:]))
1425 return int(j['acting'][-1])
1426 assert False
1427
1428 def wait_for_pg_stats(func):
1429 # both osd_mon_report_interval_min and mgr_stats_period are 5 seconds
1430 # by default, and take the faulty injection in ms into consideration,
1431 # 12 seconds are more than enough
1432 delays = [1, 1, 2, 3, 5, 8, 13]
1433 @wraps(func)
1434 def wrapper(self, *args, **kwargs):
1435 exc = None
1436 for delay in delays:
1437 try:
1438 return func(self, *args, **kwargs)
1439 except AssertionError as e:
1440 time.sleep(delay)
1441 exc = e
1442 raise exc
1443 return wrapper
1444
1445 def get_pg_primary(self, pool, pgnum):
1446 """
1447 get primary for pool, pgnum (e.g. (data, 0)->0
1448 """
1449 pg_str = self.get_pgid(pool, pgnum)
1450 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1451 j = json.loads('\n'.join(output.split('\n')[1:]))
1452 return int(j['acting'][0])
1453 assert False
1454
1455 def get_pool_num(self, pool):
1456 """
1457 get number for pool (e.g., data -> 2)
1458 """
1459 return int(self.get_pool_dump(pool)['pool'])
1460
1461 def list_pools(self):
1462 """
1463 list all pool names
1464 """
1465 osd_dump = self.get_osd_dump_json()
1466 self.log(osd_dump['pools'])
1467 return [str(i['pool_name']) for i in osd_dump['pools']]
1468
1469 def clear_pools(self):
1470 """
1471 remove all pools
1472 """
1473 [self.remove_pool(i) for i in self.list_pools()]
1474
1475 def kick_recovery_wq(self, osdnum):
1476 """
1477 Run kick_recovery_wq on cluster.
1478 """
1479 return self.raw_cluster_cmd(
1480 'tell', "osd.%d" % (int(osdnum),),
1481 'debug',
1482 'kick_recovery_wq',
1483 '0')
1484
1485 def wait_run_admin_socket(self, service_type,
1486 service_id, args=['version'], timeout=75, stdout=None):
1487 """
1488 If osd_admin_socket call suceeds, return. Otherwise wait
1489 five seconds and try again.
1490 """
1491 if stdout is None:
1492 stdout = StringIO()
1493 tries = 0
1494 while True:
1495 proc = self.admin_socket(service_type, service_id,
1496 args, check_status=False, stdout=stdout)
1497 if proc.exitstatus is 0:
1498 return proc
1499 else:
1500 tries += 1
1501 if (tries * 5) > timeout:
1502 raise Exception('timed out waiting for admin_socket '
1503 'to appear after {type}.{id} restart'.
1504 format(type=service_type,
1505 id=service_id))
1506 self.log("waiting on admin_socket for {type}-{id}, "
1507 "{command}".format(type=service_type,
1508 id=service_id,
1509 command=args))
1510 time.sleep(5)
1511
1512 def get_pool_dump(self, pool):
1513 """
1514 get the osd dump part of a pool
1515 """
1516 osd_dump = self.get_osd_dump_json()
1517 for i in osd_dump['pools']:
1518 if i['pool_name'] == pool:
1519 return i
1520 assert False
1521
1522 def get_config(self, service_type, service_id, name):
1523 """
1524 :param node: like 'mon.a'
1525 :param name: the option name
1526 """
1527 proc = self.wait_run_admin_socket(service_type, service_id,
1528 ['config', 'show'])
1529 j = json.loads(proc.stdout.getvalue())
1530 return j[name]
1531
1532 def set_config(self, osdnum, **argdict):
1533 """
1534 :param osdnum: osd number
1535 :param argdict: dictionary containing values to set.
1536 """
1537 for k, v in argdict.iteritems():
1538 self.wait_run_admin_socket(
1539 'osd', osdnum,
1540 ['config', 'set', str(k), str(v)])
1541
1542 def raw_cluster_status(self):
1543 """
1544 Get status from cluster
1545 """
1546 status = self.raw_cluster_cmd('status', '--format=json-pretty')
1547 return json.loads(status)
1548
1549 def raw_osd_status(self):
1550 """
1551 Get osd status from cluster
1552 """
1553 return self.raw_cluster_cmd('osd', 'dump')
1554
1555 def get_osd_status(self):
1556 """
1557 Get osd statuses sorted by states that the osds are in.
1558 """
1559 osd_lines = filter(
1560 lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
1561 self.raw_osd_status().split('\n'))
1562 self.log(osd_lines)
1563 in_osds = [int(i[4:].split()[0])
1564 for i in filter(lambda x: " in " in x, osd_lines)]
1565 out_osds = [int(i[4:].split()[0])
1566 for i in filter(lambda x: " out " in x, osd_lines)]
1567 up_osds = [int(i[4:].split()[0])
1568 for i in filter(lambda x: " up " in x, osd_lines)]
1569 down_osds = [int(i[4:].split()[0])
1570 for i in filter(lambda x: " down " in x, osd_lines)]
1571 dead_osds = [int(x.id_)
1572 for x in filter(lambda x:
1573 not x.running(),
1574 self.ctx.daemons.
1575 iter_daemons_of_role('osd', self.cluster))]
1576 live_osds = [int(x.id_) for x in
1577 filter(lambda x:
1578 x.running(),
1579 self.ctx.daemons.iter_daemons_of_role('osd',
1580 self.cluster))]
1581 return {'in': in_osds, 'out': out_osds, 'up': up_osds,
1582 'down': down_osds, 'dead': dead_osds, 'live': live_osds,
1583 'raw': osd_lines}
1584
1585 def get_num_pgs(self):
1586 """
1587 Check cluster status for the number of pgs
1588 """
1589 status = self.raw_cluster_status()
1590 self.log(status)
1591 return status['pgmap']['num_pgs']
1592
1593 def create_erasure_code_profile(self, profile_name, profile):
1594 """
1595 Create an erasure code profile name that can be used as a parameter
1596 when creating an erasure coded pool.
1597 """
1598 with self.lock:
1599 args = cmd_erasure_code_profile(profile_name, profile)
1600 self.raw_cluster_cmd(*args)
1601
1602 def create_pool_with_unique_name(self, pg_num=16,
1603 erasure_code_profile_name=None,
1604 min_size=None,
1605 erasure_code_use_overwrites=False):
1606 """
1607 Create a pool named unique_pool_X where X is unique.
1608 """
1609 name = ""
1610 with self.lock:
1611 name = "unique_pool_%s" % (str(self.next_pool_id),)
1612 self.next_pool_id += 1
1613 self.create_pool(
1614 name,
1615 pg_num,
1616 erasure_code_profile_name=erasure_code_profile_name,
1617 min_size=min_size,
1618 erasure_code_use_overwrites=erasure_code_use_overwrites)
1619 return name
1620
1621 @contextlib.contextmanager
1622 def pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
1623 self.create_pool(pool_name, pg_num, erasure_code_profile_name)
1624 yield
1625 self.remove_pool(pool_name)
1626
1627 def create_pool(self, pool_name, pg_num=16,
1628 erasure_code_profile_name=None,
1629 min_size=None,
1630 erasure_code_use_overwrites=False):
1631 """
1632 Create a pool named from the pool_name parameter.
1633 :param pool_name: name of the pool being created.
1634 :param pg_num: initial number of pgs.
1635 :param erasure_code_profile_name: if set and !None create an
1636 erasure coded pool using the profile
1637 :param erasure_code_use_overwrites: if true, allow overwrites
1638 """
1639 with self.lock:
1640 assert isinstance(pool_name, basestring)
1641 assert isinstance(pg_num, int)
1642 assert pool_name not in self.pools
1643 self.log("creating pool_name %s" % (pool_name,))
1644 if erasure_code_profile_name:
1645 self.raw_cluster_cmd('osd', 'pool', 'create',
1646 pool_name, str(pg_num), str(pg_num),
1647 'erasure', erasure_code_profile_name)
1648 else:
1649 self.raw_cluster_cmd('osd', 'pool', 'create',
1650 pool_name, str(pg_num))
1651 if min_size is not None:
1652 self.raw_cluster_cmd(
1653 'osd', 'pool', 'set', pool_name,
1654 'min_size',
1655 str(min_size))
1656 if erasure_code_use_overwrites:
1657 self.raw_cluster_cmd(
1658 'osd', 'pool', 'set', pool_name,
1659 'allow_ec_overwrites',
1660 'true')
c07f9fc5
FG
1661 self.raw_cluster_cmd(
1662 'osd', 'pool', 'application', 'enable',
1663 pool_name, 'rados', '--yes-i-really-mean-it',
1664 run.Raw('||'), 'true')
7c673cae
FG
1665 self.pools[pool_name] = pg_num
1666 time.sleep(1)
1667
1668 def add_pool_snap(self, pool_name, snap_name):
1669 """
1670 Add pool snapshot
1671 :param pool_name: name of pool to snapshot
1672 :param snap_name: name of snapshot to take
1673 """
1674 self.raw_cluster_cmd('osd', 'pool', 'mksnap',
1675 str(pool_name), str(snap_name))
1676
1677 def remove_pool_snap(self, pool_name, snap_name):
1678 """
1679 Remove pool snapshot
1680 :param pool_name: name of pool to snapshot
1681 :param snap_name: name of snapshot to remove
1682 """
1683 self.raw_cluster_cmd('osd', 'pool', 'rmsnap',
1684 str(pool_name), str(snap_name))
1685
1686 def remove_pool(self, pool_name):
1687 """
1688 Remove the indicated pool
1689 :param pool_name: Pool to be removed
1690 """
1691 with self.lock:
1692 assert isinstance(pool_name, basestring)
1693 assert pool_name in self.pools
1694 self.log("removing pool_name %s" % (pool_name,))
1695 del self.pools[pool_name]
1696 self.do_rados(self.controller,
1697 ['rmpool', pool_name, pool_name,
1698 "--yes-i-really-really-mean-it"])
1699
1700 def get_pool(self):
1701 """
1702 Pick a random pool
1703 """
1704 with self.lock:
1705 return random.choice(self.pools.keys())
1706
1707 def get_pool_pg_num(self, pool_name):
1708 """
1709 Return the number of pgs in the pool specified.
1710 """
1711 with self.lock:
1712 assert isinstance(pool_name, basestring)
1713 if pool_name in self.pools:
1714 return self.pools[pool_name]
1715 return 0
1716
1717 def get_pool_property(self, pool_name, prop):
1718 """
1719 :param pool_name: pool
1720 :param prop: property to be checked.
1721 :returns: property as an int value.
1722 """
1723 with self.lock:
1724 assert isinstance(pool_name, basestring)
1725 assert isinstance(prop, basestring)
1726 output = self.raw_cluster_cmd(
1727 'osd',
1728 'pool',
1729 'get',
1730 pool_name,
1731 prop)
1732 return int(output.split()[1])
1733
1734 def set_pool_property(self, pool_name, prop, val):
1735 """
1736 :param pool_name: pool
1737 :param prop: property to be set.
1738 :param val: value to set.
1739
1740 This routine retries if set operation fails.
1741 """
1742 with self.lock:
1743 assert isinstance(pool_name, basestring)
1744 assert isinstance(prop, basestring)
1745 assert isinstance(val, int)
1746 tries = 0
1747 while True:
1748 r = self.raw_cluster_cmd_result(
1749 'osd',
1750 'pool',
1751 'set',
1752 pool_name,
1753 prop,
1754 str(val))
1755 if r != 11: # EAGAIN
1756 break
1757 tries += 1
1758 if tries > 50:
1759 raise Exception('timed out getting EAGAIN '
1760 'when setting pool property %s %s = %s' %
1761 (pool_name, prop, val))
1762 self.log('got EAGAIN setting pool property, '
1763 'waiting a few seconds...')
1764 time.sleep(2)
1765
1766 def expand_pool(self, pool_name, by, max_pgs):
1767 """
1768 Increase the number of pgs in a pool
1769 """
1770 with self.lock:
1771 assert isinstance(pool_name, basestring)
1772 assert isinstance(by, int)
1773 assert pool_name in self.pools
1774 if self.get_num_creating() > 0:
1775 return False
1776 if (self.pools[pool_name] + by) > max_pgs:
1777 return False
1778 self.log("increase pool size by %d" % (by,))
1779 new_pg_num = self.pools[pool_name] + by
1780 self.set_pool_property(pool_name, "pg_num", new_pg_num)
1781 self.pools[pool_name] = new_pg_num
1782 return True
1783
1784 def set_pool_pgpnum(self, pool_name, force):
1785 """
1786 Set pgpnum property of pool_name pool.
1787 """
1788 with self.lock:
1789 assert isinstance(pool_name, basestring)
1790 assert pool_name in self.pools
1791 if not force and self.get_num_creating() > 0:
1792 return False
1793 self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
1794 return True
1795
1796 def list_pg_missing(self, pgid):
1797 """
1798 return list of missing pgs with the id specified
1799 """
1800 r = None
1801 offset = {}
1802 while True:
1803 out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_missing',
1804 json.dumps(offset))
1805 j = json.loads(out)
1806 if r is None:
1807 r = j
1808 else:
1809 r['objects'].extend(j['objects'])
1810 if not 'more' in j:
1811 break
1812 if j['more'] == 0:
1813 break
1814 offset = j['objects'][-1]['oid']
1815 if 'more' in r:
1816 del r['more']
1817 return r
1818
1819 def get_pg_stats(self):
1820 """
1821 Dump the cluster and get pg stats
1822 """
1823 out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
1824 j = json.loads('\n'.join(out.split('\n')[1:]))
1825 return j['pg_stats']
1826
c07f9fc5
FG
1827 def get_pgids_to_force(self, backfill):
1828 """
1829 Return the randomized list of PGs that can have their recovery/backfill forced
1830 """
1831 j = self.get_pg_stats();
1832 pgids = []
1833 if backfill:
1834 wanted = ['degraded', 'backfilling', 'backfill_wait']
1835 else:
1836 wanted = ['recovering', 'degraded', 'recovery_wait']
1837 for pg in j:
1838 status = pg['state'].split('+')
1839 for t in wanted:
1840 if random.random() > 0.5 and not ('forced_backfill' in status or 'forced_recovery' in status) and t in status:
1841 pgids.append(pg['pgid'])
1842 break
1843 return pgids
1844
1845 def get_pgids_to_cancel_force(self, backfill):
1846 """
1847 Return the randomized list of PGs whose recovery/backfill priority is forced
1848 """
1849 j = self.get_pg_stats();
1850 pgids = []
1851 if backfill:
1852 wanted = 'forced_backfill'
1853 else:
1854 wanted = 'forced_recovery'
1855 for pg in j:
1856 status = pg['state'].split('+')
1857 if wanted in status and random.random() > 0.5:
1858 pgids.append(pg['pgid'])
1859 return pgids
1860
7c673cae
FG
1861 def compile_pg_status(self):
1862 """
1863 Return a histogram of pg state values
1864 """
1865 ret = {}
1866 j = self.get_pg_stats()
1867 for pg in j:
1868 for status in pg['state'].split('+'):
1869 if status not in ret:
1870 ret[status] = 0
1871 ret[status] += 1
1872 return ret
1873
1874 @wait_for_pg_stats
1875 def with_pg_state(self, pool, pgnum, check):
1876 pgstr = self.get_pgid(pool, pgnum)
1877 stats = self.get_single_pg_stats(pgstr)
1878 assert(check(stats['state']))
1879
1880 @wait_for_pg_stats
1881 def with_pg(self, pool, pgnum, check):
1882 pgstr = self.get_pgid(pool, pgnum)
1883 stats = self.get_single_pg_stats(pgstr)
1884 return check(stats)
1885
1886 def get_last_scrub_stamp(self, pool, pgnum):
1887 """
1888 Get the timestamp of the last scrub.
1889 """
1890 stats = self.get_single_pg_stats(self.get_pgid(pool, pgnum))
1891 return stats["last_scrub_stamp"]
1892
1893 def do_pg_scrub(self, pool, pgnum, stype):
1894 """
1895 Scrub pg and wait for scrubbing to finish
1896 """
1897 init = self.get_last_scrub_stamp(pool, pgnum)
1898 RESEND_TIMEOUT = 120 # Must be a multiple of SLEEP_TIME
1899 FATAL_TIMEOUT = RESEND_TIMEOUT * 3
1900 SLEEP_TIME = 10
1901 timer = 0
1902 while init == self.get_last_scrub_stamp(pool, pgnum):
1903 assert timer < FATAL_TIMEOUT, "fatal timeout trying to " + stype
1904 self.log("waiting for scrub type %s" % (stype,))
1905 if (timer % RESEND_TIMEOUT) == 0:
1906 self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
1907 # The first time in this loop is the actual request
1908 if timer != 0 and stype == "repair":
1909 self.log("WARNING: Resubmitted a non-idempotent repair")
1910 time.sleep(SLEEP_TIME)
1911 timer += SLEEP_TIME
1912
1913 def wait_snap_trimming_complete(self, pool):
1914 """
1915 Wait for snap trimming on pool to end
1916 """
1917 POLL_PERIOD = 10
1918 FATAL_TIMEOUT = 600
1919 start = time.time()
1920 poolnum = self.get_pool_num(pool)
1921 poolnumstr = "%s." % (poolnum,)
1922 while (True):
1923 now = time.time()
1924 if (now - start) > FATAL_TIMEOUT:
1925 assert (now - start) < FATAL_TIMEOUT, \
1926 'failed to complete snap trimming before timeout'
1927 all_stats = self.get_pg_stats()
1928 trimming = False
1929 for pg in all_stats:
1930 if (poolnumstr in pg['pgid']) and ('snaptrim' in pg['state']):
1931 self.log("pg {pg} in trimming, state: {state}".format(
1932 pg=pg['pgid'],
1933 state=pg['state']))
1934 trimming = True
1935 if not trimming:
1936 break
1937 self.log("{pool} still trimming, waiting".format(pool=pool))
1938 time.sleep(POLL_PERIOD)
1939
1940 def get_single_pg_stats(self, pgid):
1941 """
1942 Return pg for the pgid specified.
1943 """
1944 all_stats = self.get_pg_stats()
1945
1946 for pg in all_stats:
1947 if pg['pgid'] == pgid:
1948 return pg
1949
1950 return None
1951
1952 def get_object_pg_with_shard(self, pool, name, osdid):
1953 """
1954 """
1955 pool_dump = self.get_pool_dump(pool)
1956 object_map = self.get_object_map(pool, name)
1957 if pool_dump["type"] == CephManager.ERASURE_CODED_POOL:
1958 shard = object_map['acting'].index(osdid)
1959 return "{pgid}s{shard}".format(pgid=object_map['pgid'],
1960 shard=shard)
1961 else:
1962 return object_map['pgid']
1963
1964 def get_object_primary(self, pool, name):
1965 """
1966 """
1967 object_map = self.get_object_map(pool, name)
1968 return object_map['acting_primary']
1969
1970 def get_object_map(self, pool, name):
1971 """
1972 osd map --format=json converted to a python object
1973 :returns: the python object
1974 """
1975 out = self.raw_cluster_cmd('--format=json', 'osd', 'map', pool, name)
1976 return json.loads('\n'.join(out.split('\n')[1:]))
1977
1978 def get_osd_dump_json(self):
1979 """
1980 osd dump --format=json converted to a python object
1981 :returns: the python object
1982 """
1983 out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
1984 return json.loads('\n'.join(out.split('\n')[1:]))
1985
1986 def get_osd_dump(self):
1987 """
1988 Dump osds
1989 :returns: all osds
1990 """
1991 return self.get_osd_dump_json()['osds']
1992
c07f9fc5
FG
1993 def get_mgr_dump(self):
1994 out = self.raw_cluster_cmd('mgr', 'dump', '--format=json')
1995 return json.loads(out)
1996
7c673cae
FG
1997 def get_stuck_pgs(self, type_, threshold):
1998 """
1999 :returns: stuck pg information from the cluster
2000 """
2001 out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
2002 '--format=json')
2003 return json.loads(out)
2004
2005 def get_num_unfound_objects(self):
2006 """
2007 Check cluster status to get the number of unfound objects
2008 """
2009 status = self.raw_cluster_status()
2010 self.log(status)
2011 return status['pgmap'].get('unfound_objects', 0)
2012
2013 def get_num_creating(self):
2014 """
2015 Find the number of pgs in creating mode.
2016 """
2017 pgs = self.get_pg_stats()
2018 num = 0
2019 for pg in pgs:
2020 if 'creating' in pg['state']:
2021 num += 1
2022 return num
2023
2024 def get_num_active_clean(self):
2025 """
2026 Find the number of active and clean pgs.
2027 """
2028 pgs = self.get_pg_stats()
2029 num = 0
2030 for pg in pgs:
2031 if (pg['state'].count('active') and
2032 pg['state'].count('clean') and
2033 not pg['state'].count('stale')):
2034 num += 1
2035 return num
2036
2037 def get_num_active_recovered(self):
2038 """
2039 Find the number of active and recovered pgs.
2040 """
2041 pgs = self.get_pg_stats()
2042 num = 0
2043 for pg in pgs:
2044 if (pg['state'].count('active') and
2045 not pg['state'].count('recover') and
2046 not pg['state'].count('backfill') and
2047 not pg['state'].count('stale')):
2048 num += 1
2049 return num
2050
2051 def get_is_making_recovery_progress(self):
2052 """
2053 Return whether there is recovery progress discernable in the
2054 raw cluster status
2055 """
2056 status = self.raw_cluster_status()
2057 kps = status['pgmap'].get('recovering_keys_per_sec', 0)
2058 bps = status['pgmap'].get('recovering_bytes_per_sec', 0)
2059 ops = status['pgmap'].get('recovering_objects_per_sec', 0)
2060 return kps > 0 or bps > 0 or ops > 0
2061
2062 def get_num_active(self):
2063 """
2064 Find the number of active pgs.
2065 """
2066 pgs = self.get_pg_stats()
2067 num = 0
2068 for pg in pgs:
2069 if pg['state'].count('active') and not pg['state'].count('stale'):
2070 num += 1
2071 return num
2072
2073 def get_num_down(self):
2074 """
2075 Find the number of pgs that are down.
2076 """
2077 pgs = self.get_pg_stats()
2078 num = 0
2079 for pg in pgs:
2080 if ((pg['state'].count('down') and not
2081 pg['state'].count('stale')) or
2082 (pg['state'].count('incomplete') and not
2083 pg['state'].count('stale'))):
2084 num += 1
2085 return num
2086
2087 def get_num_active_down(self):
2088 """
2089 Find the number of pgs that are either active or down.
2090 """
2091 pgs = self.get_pg_stats()
2092 num = 0
2093 for pg in pgs:
2094 if ((pg['state'].count('active') and not
2095 pg['state'].count('stale')) or
2096 (pg['state'].count('down') and not
2097 pg['state'].count('stale')) or
2098 (pg['state'].count('incomplete') and not
2099 pg['state'].count('stale'))):
2100 num += 1
2101 return num
2102
2103 def is_clean(self):
2104 """
2105 True if all pgs are clean
2106 """
2107 return self.get_num_active_clean() == self.get_num_pgs()
2108
2109 def is_recovered(self):
2110 """
2111 True if all pgs have recovered
2112 """
2113 return self.get_num_active_recovered() == self.get_num_pgs()
2114
2115 def is_active_or_down(self):
2116 """
2117 True if all pgs are active or down
2118 """
2119 return self.get_num_active_down() == self.get_num_pgs()
2120
2121 def wait_for_clean(self, timeout=None):
2122 """
2123 Returns true when all pgs are clean.
2124 """
2125 self.log("waiting for clean")
2126 start = time.time()
2127 num_active_clean = self.get_num_active_clean()
2128 while not self.is_clean():
2129 if timeout is not None:
2130 if self.get_is_making_recovery_progress():
2131 self.log("making progress, resetting timeout")
2132 start = time.time()
2133 else:
2134 self.log("no progress seen, keeping timeout for now")
2135 if time.time() - start >= timeout:
2136 self.log('dumping pgs')
2137 out = self.raw_cluster_cmd('pg', 'dump')
2138 self.log(out)
2139 assert time.time() - start < timeout, \
2140 'failed to become clean before timeout expired'
2141 cur_active_clean = self.get_num_active_clean()
2142 if cur_active_clean != num_active_clean:
2143 start = time.time()
2144 num_active_clean = cur_active_clean
2145 time.sleep(3)
2146 self.log("clean!")
2147
2148 def are_all_osds_up(self):
2149 """
2150 Returns true if all osds are up.
2151 """
2152 x = self.get_osd_dump()
2153 return (len(x) == sum([(y['up'] > 0) for y in x]))
2154
c07f9fc5 2155 def wait_for_all_osds_up(self, timeout=None):
7c673cae
FG
2156 """
2157 When this exits, either the timeout has expired, or all
2158 osds are up.
2159 """
2160 self.log("waiting for all up")
2161 start = time.time()
2162 while not self.are_all_osds_up():
2163 if timeout is not None:
2164 assert time.time() - start < timeout, \
c07f9fc5 2165 'timeout expired in wait_for_all_osds_up'
7c673cae
FG
2166 time.sleep(3)
2167 self.log("all up!")
2168
c07f9fc5
FG
2169 def pool_exists(self, pool):
2170 if pool in self.list_pools():
2171 return True
2172 return False
2173
2174 def wait_for_pool(self, pool, timeout=300):
2175 """
2176 Wait for a pool to exist
2177 """
2178 self.log('waiting for pool %s to exist' % pool)
2179 start = time.time()
2180 while not self.pool_exists(pool):
2181 if timeout is not None:
2182 assert time.time() - start < timeout, \
2183 'timeout expired in wait_for_pool'
2184 time.sleep(3)
2185
2186 def wait_for_pools(self, pools):
2187 for pool in pools:
2188 self.wait_for_pool(pool)
2189
2190 def is_mgr_available(self):
2191 x = self.get_mgr_dump()
2192 return x.get('available', False)
2193
2194 def wait_for_mgr_available(self, timeout=None):
2195 self.log("waiting for mgr available")
2196 start = time.time()
2197 while not self.is_mgr_available():
2198 if timeout is not None:
2199 assert time.time() - start < timeout, \
2200 'timeout expired in wait_for_mgr_available'
2201 time.sleep(3)
2202 self.log("mgr available!")
2203
7c673cae
FG
2204 def wait_for_recovery(self, timeout=None):
2205 """
2206 Check peering. When this exists, we have recovered.
2207 """
2208 self.log("waiting for recovery to complete")
2209 start = time.time()
2210 num_active_recovered = self.get_num_active_recovered()
2211 while not self.is_recovered():
2212 now = time.time()
2213 if timeout is not None:
2214 if self.get_is_making_recovery_progress():
2215 self.log("making progress, resetting timeout")
2216 start = time.time()
2217 else:
2218 self.log("no progress seen, keeping timeout for now")
2219 if now - start >= timeout:
2220 self.log('dumping pgs')
2221 out = self.raw_cluster_cmd('pg', 'dump')
2222 self.log(out)
2223 assert now - start < timeout, \
2224 'failed to recover before timeout expired'
2225 cur_active_recovered = self.get_num_active_recovered()
2226 if cur_active_recovered != num_active_recovered:
2227 start = time.time()
2228 num_active_recovered = cur_active_recovered
2229 time.sleep(3)
2230 self.log("recovered!")
2231
2232 def wait_for_active(self, timeout=None):
2233 """
2234 Check peering. When this exists, we are definitely active
2235 """
2236 self.log("waiting for peering to complete")
2237 start = time.time()
2238 num_active = self.get_num_active()
2239 while not self.is_active():
2240 if timeout is not None:
2241 if time.time() - start >= timeout:
2242 self.log('dumping pgs')
2243 out = self.raw_cluster_cmd('pg', 'dump')
2244 self.log(out)
2245 assert time.time() - start < timeout, \
2246 'failed to recover before timeout expired'
2247 cur_active = self.get_num_active()
2248 if cur_active != num_active:
2249 start = time.time()
2250 num_active = cur_active
2251 time.sleep(3)
2252 self.log("active!")
2253
2254 def wait_for_active_or_down(self, timeout=None):
2255 """
2256 Check peering. When this exists, we are definitely either
2257 active or down
2258 """
2259 self.log("waiting for peering to complete or become blocked")
2260 start = time.time()
2261 num_active_down = self.get_num_active_down()
2262 while not self.is_active_or_down():
2263 if timeout is not None:
2264 if time.time() - start >= timeout:
2265 self.log('dumping pgs')
2266 out = self.raw_cluster_cmd('pg', 'dump')
2267 self.log(out)
2268 assert time.time() - start < timeout, \
2269 'failed to recover before timeout expired'
2270 cur_active_down = self.get_num_active_down()
2271 if cur_active_down != num_active_down:
2272 start = time.time()
2273 num_active_down = cur_active_down
2274 time.sleep(3)
2275 self.log("active or down!")
2276
2277 def osd_is_up(self, osd):
2278 """
2279 Wrapper for osd check
2280 """
2281 osds = self.get_osd_dump()
2282 return osds[osd]['up'] > 0
2283
2284 def wait_till_osd_is_up(self, osd, timeout=None):
2285 """
2286 Loop waiting for osd.
2287 """
2288 self.log('waiting for osd.%d to be up' % osd)
2289 start = time.time()
2290 while not self.osd_is_up(osd):
2291 if timeout is not None:
2292 assert time.time() - start < timeout, \
2293 'osd.%d failed to come up before timeout expired' % osd
2294 time.sleep(3)
2295 self.log('osd.%d is up' % osd)
2296
2297 def is_active(self):
2298 """
2299 Wrapper to check if all pgs are active
2300 """
2301 return self.get_num_active() == self.get_num_pgs()
2302
2303 def wait_till_active(self, timeout=None):
2304 """
2305 Wait until all pgs are active.
2306 """
2307 self.log("waiting till active")
2308 start = time.time()
2309 while not self.is_active():
2310 if timeout is not None:
2311 if time.time() - start >= timeout:
2312 self.log('dumping pgs')
2313 out = self.raw_cluster_cmd('pg', 'dump')
2314 self.log(out)
2315 assert time.time() - start < timeout, \
2316 'failed to become active before timeout expired'
2317 time.sleep(3)
2318 self.log("active!")
2319
2320 def mark_out_osd(self, osd):
2321 """
2322 Wrapper to mark osd out.
2323 """
2324 self.raw_cluster_cmd('osd', 'out', str(osd))
2325
2326 def kill_osd(self, osd):
2327 """
2328 Kill osds by either power cycling (if indicated by the config)
2329 or by stopping.
2330 """
2331 if self.config.get('powercycle'):
2332 remote = self.find_remote('osd', osd)
2333 self.log('kill_osd on osd.{o} '
2334 'doing powercycle of {s}'.format(o=osd, s=remote.name))
2335 self._assert_ipmi(remote)
2336 remote.console.power_off()
2337 elif self.config.get('bdev_inject_crash') and self.config.get('bdev_inject_crash_probability'):
2338 if random.uniform(0, 1) < self.config.get('bdev_inject_crash_probability', .5):
2339 self.raw_cluster_cmd(
2340 '--', 'tell', 'osd.%d' % osd,
2341 'injectargs',
2342 '--bdev-inject-crash %d' % self.config.get('bdev_inject_crash'),
2343 )
2344 try:
2345 self.ctx.daemons.get_daemon('osd', osd, self.cluster).wait()
2346 except:
2347 pass
2348 else:
2349 raise RuntimeError('osd.%s did not fail' % osd)
2350 else:
2351 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2352 else:
2353 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2354
2355 @staticmethod
2356 def _assert_ipmi(remote):
2357 assert remote.console.has_ipmi_credentials, (
2358 "powercycling requested but RemoteConsole is not "
2359 "initialized. Check ipmi config.")
2360
2361 def blackhole_kill_osd(self, osd):
2362 """
2363 Stop osd if nothing else works.
2364 """
2365 self.raw_cluster_cmd('--', 'tell', 'osd.%d' % osd,
2366 'injectargs',
2367 '--objectstore-blackhole')
2368 time.sleep(2)
2369 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2370
2371 def revive_osd(self, osd, timeout=150, skip_admin_check=False):
2372 """
2373 Revive osds by either power cycling (if indicated by the config)
2374 or by restarting.
2375 """
2376 if self.config.get('powercycle'):
2377 remote = self.find_remote('osd', osd)
2378 self.log('kill_osd on osd.{o} doing powercycle of {s}'.
2379 format(o=osd, s=remote.name))
2380 self._assert_ipmi(remote)
2381 remote.console.power_on()
2382 if not remote.console.check_status(300):
2383 raise Exception('Failed to revive osd.{o} via ipmi'.
2384 format(o=osd))
2385 teuthology.reconnect(self.ctx, 60, [remote])
2386 mount_osd_data(self.ctx, remote, self.cluster, str(osd))
2387 self.make_admin_daemon_dir(remote)
2388 self.ctx.daemons.get_daemon('osd', osd, self.cluster).reset()
2389 self.ctx.daemons.get_daemon('osd', osd, self.cluster).restart()
2390
2391 if not skip_admin_check:
2392 # wait for dump_ops_in_flight; this command doesn't appear
2393 # until after the signal handler is installed and it is safe
2394 # to stop the osd again without making valgrind leak checks
2395 # unhappy. see #5924.
2396 self.wait_run_admin_socket('osd', osd,
2397 args=['dump_ops_in_flight'],
2398 timeout=timeout, stdout=DEVNULL)
2399
2400 def mark_down_osd(self, osd):
2401 """
2402 Cluster command wrapper
2403 """
2404 self.raw_cluster_cmd('osd', 'down', str(osd))
2405
2406 def mark_in_osd(self, osd):
2407 """
2408 Cluster command wrapper
2409 """
2410 self.raw_cluster_cmd('osd', 'in', str(osd))
2411
2412 def signal_osd(self, osd, sig, silent=False):
2413 """
2414 Wrapper to local get_daemon call which sends the given
2415 signal to the given osd.
2416 """
2417 self.ctx.daemons.get_daemon('osd', osd,
2418 self.cluster).signal(sig, silent=silent)
2419
2420 ## monitors
2421 def signal_mon(self, mon, sig, silent=False):
2422 """
2423 Wrapper to local get_deamon call
2424 """
2425 self.ctx.daemons.get_daemon('mon', mon,
2426 self.cluster).signal(sig, silent=silent)
2427
2428 def kill_mon(self, mon):
2429 """
2430 Kill the monitor by either power cycling (if the config says so),
2431 or by doing a stop.
2432 """
2433 if self.config.get('powercycle'):
2434 remote = self.find_remote('mon', mon)
2435 self.log('kill_mon on mon.{m} doing powercycle of {s}'.
2436 format(m=mon, s=remote.name))
2437 self._assert_ipmi(remote)
2438 remote.console.power_off()
2439 else:
2440 self.ctx.daemons.get_daemon('mon', mon, self.cluster).stop()
2441
2442 def revive_mon(self, mon):
2443 """
2444 Restart by either power cycling (if the config says so),
2445 or by doing a normal restart.
2446 """
2447 if self.config.get('powercycle'):
2448 remote = self.find_remote('mon', mon)
2449 self.log('revive_mon on mon.{m} doing powercycle of {s}'.
2450 format(m=mon, s=remote.name))
2451 self._assert_ipmi(remote)
2452 remote.console.power_on()
2453 self.make_admin_daemon_dir(remote)
2454 self.ctx.daemons.get_daemon('mon', mon, self.cluster).restart()
2455
31f18b77
FG
2456 def revive_mgr(self, mgr):
2457 """
2458 Restart by either power cycling (if the config says so),
2459 or by doing a normal restart.
2460 """
2461 if self.config.get('powercycle'):
2462 remote = self.find_remote('mgr', mgr)
2463 self.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
2464 format(m=mgr, s=remote.name))
2465 self._assert_ipmi(remote)
2466 remote.console.power_on()
2467 self.make_admin_daemon_dir(remote)
2468 self.ctx.daemons.get_daemon('mgr', mgr, self.cluster).restart()
2469
7c673cae
FG
2470 def get_mon_status(self, mon):
2471 """
2472 Extract all the monitor status information from the cluster
2473 """
2474 addr = self.ctx.ceph[self.cluster].conf['mon.%s' % mon]['mon addr']
2475 out = self.raw_cluster_cmd('-m', addr, 'mon_status')
2476 return json.loads(out)
2477
2478 def get_mon_quorum(self):
2479 """
2480 Extract monitor quorum information from the cluster
2481 """
2482 out = self.raw_cluster_cmd('quorum_status')
2483 j = json.loads(out)
2484 self.log('quorum_status is %s' % out)
2485 return j['quorum']
2486
2487 def wait_for_mon_quorum_size(self, size, timeout=300):
2488 """
2489 Loop until quorum size is reached.
2490 """
2491 self.log('waiting for quorum size %d' % size)
2492 start = time.time()
2493 while not len(self.get_mon_quorum()) == size:
2494 if timeout is not None:
2495 assert time.time() - start < timeout, \
2496 ('failed to reach quorum size %d '
2497 'before timeout expired' % size)
2498 time.sleep(3)
2499 self.log("quorum is size %d" % size)
2500
2501 def get_mon_health(self, debug=False):
2502 """
2503 Extract all the monitor health information.
2504 """
2505 out = self.raw_cluster_cmd('health', '--format=json')
2506 if debug:
2507 self.log('health:\n{h}'.format(h=out))
2508 return json.loads(out)
2509
2510 def get_mds_status(self, mds):
2511 """
2512 Run cluster commands for the mds in order to get mds information
2513 """
2514 out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
2515 j = json.loads(' '.join(out.splitlines()[1:]))
2516 # collate; for dup ids, larger gid wins.
2517 for info in j['info'].itervalues():
2518 if info['name'] == mds:
2519 return info
2520 return None
2521
2522 def get_filepath(self):
2523 """
2524 Return path to osd data with {id} needing to be replaced
2525 """
2526 return '/var/lib/ceph/osd/' + self.cluster + '-{id}'
2527
2528 def make_admin_daemon_dir(self, remote):
2529 """
2530 Create /var/run/ceph directory on remote site.
2531
2532 :param ctx: Context
2533 :param remote: Remote site
2534 """
2535 remote.run(args=['sudo',
2536 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
2537
2538
2539def utility_task(name):
2540 """
2541 Generate ceph_manager subtask corresponding to ceph_manager
2542 method name
2543 """
2544 def task(ctx, config):
2545 if config is None:
2546 config = {}
2547 args = config.get('args', [])
2548 kwargs = config.get('kwargs', {})
2549 cluster = config.get('cluster', 'ceph')
2550 fn = getattr(ctx.managers[cluster], name)
2551 fn(*args, **kwargs)
2552 return task
2553
2554revive_osd = utility_task("revive_osd")
2555revive_mon = utility_task("revive_mon")
2556kill_osd = utility_task("kill_osd")
2557kill_mon = utility_task("kill_mon")
2558create_pool = utility_task("create_pool")
2559remove_pool = utility_task("remove_pool")
2560wait_for_clean = utility_task("wait_for_clean")
c07f9fc5 2561flush_all_pg_stats = utility_task("flush_all_pg_stats")
7c673cae
FG
2562set_pool_property = utility_task("set_pool_property")
2563do_pg_scrub = utility_task("do_pg_scrub")
c07f9fc5
FG
2564wait_for_pool = utility_task("wait_for_pool")
2565wait_for_pools = utility_task("wait_for_pools")