]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/ceph_manager.py
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / qa / tasks / ceph_manager.py
CommitLineData
7c673cae
FG
1"""
2ceph manager -- Thrasher and CephManager objects
3"""
4from cStringIO import StringIO
5from functools import wraps
6import contextlib
7import random
8import signal
9import time
10import gevent
11import base64
12import json
13import logging
14import threading
15import traceback
16import os
17from teuthology import misc as teuthology
18from tasks.scrub import Scrubber
19from util.rados import cmd_erasure_code_profile
20from util import get_remote
21from teuthology.contextutil import safe_while
22from teuthology.orchestra.remote import Remote
23from teuthology.orchestra import run
24from teuthology.exceptions import CommandFailedError
25
26try:
27 from subprocess import DEVNULL # py3k
28except ImportError:
29 DEVNULL = open(os.devnull, 'r+')
30
31DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
32
33log = logging.getLogger(__name__)
34
35
36def write_conf(ctx, conf_path=DEFAULT_CONF_PATH, cluster='ceph'):
37 conf_fp = StringIO()
38 ctx.ceph[cluster].conf.write(conf_fp)
39 conf_fp.seek(0)
40 writes = ctx.cluster.run(
41 args=[
42 'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
43 'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
44 'sudo', 'python',
45 '-c',
46 ('import shutil, sys; '
47 'shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))'),
48 conf_path,
49 run.Raw('&&'),
50 'sudo', 'chmod', '0644', conf_path,
51 ],
52 stdin=run.PIPE,
53 wait=False)
54 teuthology.feed_many_stdins_and_close(conf_fp, writes)
55 run.wait(writes)
56
57
58def mount_osd_data(ctx, remote, cluster, osd):
59 """
60 Mount a remote OSD
61
62 :param ctx: Context
63 :param remote: Remote site
64 :param cluster: name of ceph cluster
65 :param osd: Osd name
66 """
67 log.debug('Mounting data for osd.{o} on {r}'.format(o=osd, r=remote))
68 role = "{0}.osd.{1}".format(cluster, osd)
69 alt_role = role if cluster != 'ceph' else "osd.{0}".format(osd)
70 if remote in ctx.disk_config.remote_to_roles_to_dev:
71 if alt_role in ctx.disk_config.remote_to_roles_to_dev[remote]:
72 role = alt_role
73 if role not in ctx.disk_config.remote_to_roles_to_dev[remote]:
74 return
75 dev = ctx.disk_config.remote_to_roles_to_dev[remote][role]
76 mount_options = ctx.disk_config.\
77 remote_to_roles_to_dev_mount_options[remote][role]
78 fstype = ctx.disk_config.remote_to_roles_to_dev_fstype[remote][role]
79 mnt = os.path.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster, osd))
80
81 log.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
82 'mountpoint: {p}, type: {t}, options: {v}'.format(
83 o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options,
84 c=cluster))
85
86 remote.run(
87 args=[
88 'sudo',
89 'mount',
90 '-t', fstype,
91 '-o', ','.join(mount_options),
92 dev,
93 mnt,
94 ]
95 )
96
97
98class Thrasher:
99 """
100 Object used to thrash Ceph
101 """
102 def __init__(self, manager, config, logger=None):
103 self.ceph_manager = manager
104 self.cluster = manager.cluster
105 self.ceph_manager.wait_for_clean()
106 osd_status = self.ceph_manager.get_osd_status()
107 self.in_osds = osd_status['in']
108 self.live_osds = osd_status['live']
109 self.out_osds = osd_status['out']
110 self.dead_osds = osd_status['dead']
111 self.stopping = False
112 self.logger = logger
113 self.config = config
114 self.revive_timeout = self.config.get("revive_timeout", 150)
115 self.pools_to_fix_pgp_num = set()
116 if self.config.get('powercycle'):
117 self.revive_timeout += 120
118 self.clean_wait = self.config.get('clean_wait', 0)
119 self.minin = self.config.get("min_in", 3)
120 self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
121 self.sighup_delay = self.config.get('sighup_delay')
122 self.optrack_toggle_delay = self.config.get('optrack_toggle_delay')
123 self.dump_ops_enable = self.config.get('dump_ops_enable')
124 self.noscrub_toggle_delay = self.config.get('noscrub_toggle_delay')
125 self.chance_thrash_cluster_full = self.config.get('chance_thrash_cluster_full', .05)
126 self.chance_thrash_pg_upmap = self.config.get('chance_thrash_pg_upmap', 1.0)
127 self.chance_thrash_pg_upmap_items = self.config.get('chance_thrash_pg_upmap', 1.0)
128
129 num_osds = self.in_osds + self.out_osds
130 self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * num_osds
131 if self.logger is not None:
132 self.log = lambda x: self.logger.info(x)
133 else:
134 def tmp(x):
135 """
136 Implement log behavior
137 """
138 print x
139 self.log = tmp
140 if self.config is None:
141 self.config = dict()
142 # prevent monitor from auto-marking things out while thrasher runs
143 # try both old and new tell syntax, in case we are testing old code
144 self.saved_options = []
145 # assuming that the default settings do not vary from one daemon to
146 # another
147 first_mon = teuthology.get_first_mon(manager.ctx, self.config).split('.')
148 opts = [('mon', 'mon_osd_down_out_interval', 0)]
149 for service, opt, new_value in opts:
150 old_value = manager.get_config(first_mon[0],
151 first_mon[1],
152 opt)
153 self.saved_options.append((service, opt, old_value))
154 self._set_config(service, '*', opt, new_value)
155 # initialize ceph_objectstore_tool property - must be done before
156 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
157 if (self.config.get('powercycle') or
158 not self.cmd_exists_on_osds("ceph-objectstore-tool") or
159 self.config.get('disable_objectstore_tool_tests', False)):
160 self.ceph_objectstore_tool = False
161 self.test_rm_past_intervals = False
162 if self.config.get('powercycle'):
163 self.log("Unable to test ceph-objectstore-tool, "
164 "powercycle testing")
165 else:
166 self.log("Unable to test ceph-objectstore-tool, "
167 "not available on all OSD nodes")
168 else:
169 self.ceph_objectstore_tool = \
170 self.config.get('ceph_objectstore_tool', True)
171 self.test_rm_past_intervals = \
172 self.config.get('test_rm_past_intervals', True)
173 # spawn do_thrash
174 self.thread = gevent.spawn(self.do_thrash)
175 if self.sighup_delay:
176 self.sighup_thread = gevent.spawn(self.do_sighup)
177 if self.optrack_toggle_delay:
178 self.optrack_toggle_thread = gevent.spawn(self.do_optrack_toggle)
179 if self.dump_ops_enable == "true":
180 self.dump_ops_thread = gevent.spawn(self.do_dump_ops)
181 if self.noscrub_toggle_delay:
182 self.noscrub_toggle_thread = gevent.spawn(self.do_noscrub_toggle)
183
184 def _set_config(self, service_type, service_id, name, value):
185 opt_arg = '--{name} {value}'.format(name=name, value=value)
186 try:
187 whom = '.'.join([service_type, service_id])
188 self.ceph_manager.raw_cluster_cmd('--', 'tell', whom,
189 'injectargs', opt_arg)
190 except Exception:
191 self.ceph_manager.raw_cluster_cmd('--', service_type,
192 'tell', service_id,
193 'injectargs', opt_arg)
194
195
196 def cmd_exists_on_osds(self, cmd):
197 allremotes = self.ceph_manager.ctx.cluster.only(\
198 teuthology.is_type('osd', self.cluster)).remotes.keys()
199 allremotes = list(set(allremotes))
200 for remote in allremotes:
201 proc = remote.run(args=['type', cmd], wait=True,
202 check_status=False, stdout=StringIO(),
203 stderr=StringIO())
204 if proc.exitstatus != 0:
205 return False;
206 return True;
207
208 def kill_osd(self, osd=None, mark_down=False, mark_out=False):
209 """
210 :param osd: Osd to be killed.
211 :mark_down: Mark down if true.
212 :mark_out: Mark out if true.
213 """
214 if osd is None:
215 osd = random.choice(self.live_osds)
216 self.log("Killing osd %s, live_osds are %s" % (str(osd),
217 str(self.live_osds)))
218 self.live_osds.remove(osd)
219 self.dead_osds.append(osd)
220 self.ceph_manager.kill_osd(osd)
221 if mark_down:
222 self.ceph_manager.mark_down_osd(osd)
223 if mark_out and osd in self.in_osds:
224 self.out_osd(osd)
225 if self.ceph_objectstore_tool:
226 self.log("Testing ceph-objectstore-tool on down osd")
227 remote = self.ceph_manager.find_remote('osd', osd)
228 FSPATH = self.ceph_manager.get_filepath()
229 JPATH = os.path.join(FSPATH, "journal")
230 exp_osd = imp_osd = osd
231 exp_remote = imp_remote = remote
232 # If an older osd is available we'll move a pg from there
233 if (len(self.dead_osds) > 1 and
234 random.random() < self.chance_move_pg):
235 exp_osd = random.choice(self.dead_osds[:-1])
236 exp_remote = self.ceph_manager.find_remote('osd', exp_osd)
237 if ('keyvaluestore_backend' in
238 self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
239 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
240 "--data-path {fpath} --journal-path {jpath} "
241 "--type keyvaluestore "
242 "--log-file="
243 "/var/log/ceph/objectstore_tool.\\$pid.log ".
244 format(fpath=FSPATH, jpath=JPATH))
245 else:
246 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
247 "--data-path {fpath} --journal-path {jpath} "
248 "--log-file="
249 "/var/log/ceph/objectstore_tool.\\$pid.log ".
250 format(fpath=FSPATH, jpath=JPATH))
251 cmd = (prefix + "--op list-pgs").format(id=exp_osd)
252
253 # ceph-objectstore-tool might be temporarily absent during an
254 # upgrade - see http://tracker.ceph.com/issues/18014
255 with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
256 while proceed():
257 proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'],
258 wait=True, check_status=False, stdout=StringIO(),
259 stderr=StringIO())
260 if proc.exitstatus == 0:
261 break
262 log.debug("ceph-objectstore-tool binary not present, trying again")
263
264 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
265 # see http://tracker.ceph.com/issues/19556
266 with safe_while(sleep=15, tries=40, action="ceph-objectstore-tool --op list-pgs") as proceed:
267 while proceed():
268 proc = exp_remote.run(args=cmd, wait=True,
269 check_status=False,
270 stdout=StringIO(), stderr=StringIO())
271 if proc.exitstatus == 0:
272 break
273 elif proc.exitstatus == 1 and proc.stderr == "OSD has the store locked":
274 continue
275 else:
276 raise Exception("ceph-objectstore-tool: "
277 "exp list-pgs failure with status {ret}".
278 format(ret=proc.exitstatus))
279
280 pgs = proc.stdout.getvalue().split('\n')[:-1]
281 if len(pgs) == 0:
282 self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
283 return
284 pg = random.choice(pgs)
285 exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
286 exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
287 exp_path = os.path.join(exp_path,
288 "exp.{pg}.{id}".format(
289 pg=pg,
290 id=exp_osd))
291 # export
292 cmd = prefix + "--op export --pgid {pg} --file {file}"
293 cmd = cmd.format(id=exp_osd, pg=pg, file=exp_path)
294 proc = exp_remote.run(args=cmd)
295 if proc.exitstatus:
296 raise Exception("ceph-objectstore-tool: "
297 "export failure with status {ret}".
298 format(ret=proc.exitstatus))
299 # remove
300 cmd = prefix + "--op remove --pgid {pg}"
301 cmd = cmd.format(id=exp_osd, pg=pg)
302 proc = exp_remote.run(args=cmd)
303 if proc.exitstatus:
304 raise Exception("ceph-objectstore-tool: "
305 "remove failure with status {ret}".
306 format(ret=proc.exitstatus))
307 # If there are at least 2 dead osds we might move the pg
308 if exp_osd != imp_osd:
309 # If pg isn't already on this osd, then we will move it there
310 cmd = (prefix + "--op list-pgs").format(id=imp_osd)
311 proc = imp_remote.run(args=cmd, wait=True,
312 check_status=False, stdout=StringIO())
313 if proc.exitstatus:
314 raise Exception("ceph-objectstore-tool: "
315 "imp list-pgs failure with status {ret}".
316 format(ret=proc.exitstatus))
317 pgs = proc.stdout.getvalue().split('\n')[:-1]
318 if pg not in pgs:
319 self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
320 format(pg=pg, fosd=exp_osd, tosd=imp_osd))
321 if imp_remote != exp_remote:
322 # Copy export file to the other machine
323 self.log("Transfer export file from {srem} to {trem}".
324 format(srem=exp_remote, trem=imp_remote))
325 tmpexport = Remote.get_file(exp_remote, exp_path)
326 Remote.put_file(imp_remote, tmpexport, exp_path)
327 os.remove(tmpexport)
328 else:
329 # Can't move the pg after all
330 imp_osd = exp_osd
331 imp_remote = exp_remote
332 # import
333 cmd = (prefix + "--op import --file {file}")
334 cmd = cmd.format(id=imp_osd, file=exp_path)
335 proc = imp_remote.run(args=cmd, wait=True, check_status=False,
336 stderr=StringIO())
337 if proc.exitstatus == 1:
338 bogosity = "The OSD you are using is older than the exported PG"
339 if bogosity in proc.stderr.getvalue():
340 self.log("OSD older than exported PG"
341 "...ignored")
342 elif proc.exitstatus == 10:
343 self.log("Pool went away before processing an import"
344 "...ignored")
345 elif proc.exitstatus == 11:
346 self.log("Attempt to import an incompatible export"
347 "...ignored")
348 elif proc.exitstatus:
349 raise Exception("ceph-objectstore-tool: "
350 "import failure with status {ret}".
351 format(ret=proc.exitstatus))
352 cmd = "rm -f {file}".format(file=exp_path)
353 exp_remote.run(args=cmd)
354 if imp_remote != exp_remote:
355 imp_remote.run(args=cmd)
356
357 # apply low split settings to each pool
358 for pool in self.ceph_manager.list_pools():
359 no_sudo_prefix = prefix[5:]
360 cmd = ("CEPH_ARGS='--filestore-merge-threshold 1 "
361 "--filestore-split-multiple 1' sudo -E "
362 + no_sudo_prefix + "--op apply-layout-settings --pool " + pool).format(id=osd)
363 proc = remote.run(args=cmd, wait=True, check_status=False, stderr=StringIO())
364 output = proc.stderr.getvalue()
365 if 'Couldn\'t find pool' in output:
366 continue
367 if proc.exitstatus:
368 raise Exception("ceph-objectstore-tool apply-layout-settings"
369 " failed with {status}".format(status=proc.exitstatus))
370
371 def rm_past_intervals(self, osd=None):
372 """
373 :param osd: Osd to find pg to remove past intervals
374 """
375 if self.test_rm_past_intervals:
376 if osd is None:
377 osd = random.choice(self.dead_osds)
378 self.log("Use ceph_objectstore_tool to remove past intervals")
379 remote = self.ceph_manager.find_remote('osd', osd)
380 FSPATH = self.ceph_manager.get_filepath()
381 JPATH = os.path.join(FSPATH, "journal")
382 if ('keyvaluestore_backend' in
383 self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
384 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
385 "--data-path {fpath} --journal-path {jpath} "
386 "--type keyvaluestore "
387 "--log-file="
388 "/var/log/ceph/objectstore_tool.\\$pid.log ".
389 format(fpath=FSPATH, jpath=JPATH))
390 else:
391 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
392 "--data-path {fpath} --journal-path {jpath} "
393 "--log-file="
394 "/var/log/ceph/objectstore_tool.\\$pid.log ".
395 format(fpath=FSPATH, jpath=JPATH))
396 cmd = (prefix + "--op list-pgs").format(id=osd)
397 proc = remote.run(args=cmd, wait=True,
398 check_status=False, stdout=StringIO())
399 if proc.exitstatus:
400 raise Exception("ceph_objectstore_tool: "
401 "exp list-pgs failure with status {ret}".
402 format(ret=proc.exitstatus))
403 pgs = proc.stdout.getvalue().split('\n')[:-1]
404 if len(pgs) == 0:
405 self.log("No PGs found for osd.{osd}".format(osd=osd))
406 return
407 pg = random.choice(pgs)
408 cmd = (prefix + "--op rm-past-intervals --pgid {pg}").\
409 format(id=osd, pg=pg)
410 proc = remote.run(args=cmd)
411 if proc.exitstatus:
412 raise Exception("ceph_objectstore_tool: "
413 "rm-past-intervals failure with status {ret}".
414 format(ret=proc.exitstatus))
415
416 def blackhole_kill_osd(self, osd=None):
417 """
418 If all else fails, kill the osd.
419 :param osd: Osd to be killed.
420 """
421 if osd is None:
422 osd = random.choice(self.live_osds)
423 self.log("Blackholing and then killing osd %s, live_osds are %s" %
424 (str(osd), str(self.live_osds)))
425 self.live_osds.remove(osd)
426 self.dead_osds.append(osd)
427 self.ceph_manager.blackhole_kill_osd(osd)
428
429 def revive_osd(self, osd=None, skip_admin_check=False):
430 """
431 Revive the osd.
432 :param osd: Osd to be revived.
433 """
434 if osd is None:
435 osd = random.choice(self.dead_osds)
436 self.log("Reviving osd %s" % (str(osd),))
437 self.ceph_manager.revive_osd(
438 osd,
439 self.revive_timeout,
440 skip_admin_check=skip_admin_check)
441 self.dead_osds.remove(osd)
442 self.live_osds.append(osd)
443
444 def out_osd(self, osd=None):
445 """
446 Mark the osd out
447 :param osd: Osd to be marked.
448 """
449 if osd is None:
450 osd = random.choice(self.in_osds)
451 self.log("Removing osd %s, in_osds are: %s" %
452 (str(osd), str(self.in_osds)))
453 self.ceph_manager.mark_out_osd(osd)
454 self.in_osds.remove(osd)
455 self.out_osds.append(osd)
456
457 def in_osd(self, osd=None):
458 """
459 Mark the osd out
460 :param osd: Osd to be marked.
461 """
462 if osd is None:
463 osd = random.choice(self.out_osds)
464 if osd in self.dead_osds:
465 return self.revive_osd(osd)
466 self.log("Adding osd %s" % (str(osd),))
467 self.out_osds.remove(osd)
468 self.in_osds.append(osd)
469 self.ceph_manager.mark_in_osd(osd)
470 self.log("Added osd %s" % (str(osd),))
471
472 def reweight_osd_or_by_util(self, osd=None):
473 """
474 Reweight an osd that is in
475 :param osd: Osd to be marked.
476 """
477 if osd is not None or random.choice([True, False]):
478 if osd is None:
479 osd = random.choice(self.in_osds)
480 val = random.uniform(.1, 1.0)
481 self.log("Reweighting osd %s to %s" % (str(osd), str(val)))
482 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
483 str(osd), str(val))
484 else:
485 # do it several times, the option space is large
486 for i in range(5):
487 options = {
488 'max_change': random.choice(['0.05', '1.0', '3.0']),
489 'overage': random.choice(['110', '1000']),
490 'type': random.choice([
491 'reweight-by-utilization',
492 'test-reweight-by-utilization']),
493 }
494 self.log("Reweighting by: %s"%(str(options),))
495 self.ceph_manager.raw_cluster_cmd(
496 'osd',
497 options['type'],
498 options['overage'],
499 options['max_change'])
500
501 def primary_affinity(self, osd=None):
502 if osd is None:
503 osd = random.choice(self.in_osds)
504 if random.random() >= .5:
505 pa = random.random()
506 elif random.random() >= .5:
507 pa = 1
508 else:
509 pa = 0
510 self.log('Setting osd %s primary_affinity to %f' % (str(osd), pa))
511 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
512 str(osd), str(pa))
513
514 def thrash_cluster_full(self):
515 """
516 Set and unset cluster full condition
517 """
518 self.log('Setting full ratio to .001')
519 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
520 time.sleep(1)
521 self.log('Setting full ratio back to .95')
522 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
523
524 def thrash_pg_upmap(self):
525 """
526 Install or remove random pg_upmap entries in OSDMap
527 """
528 from random import shuffle
529 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
530 j = json.loads(out)
531 self.log('j is %s' % j)
532 try:
533 if random.random() >= .3:
534 pgs = self.ceph_manager.get_pg_stats()
535 pg = random.choice(pgs)
536 pgid = str(pg['pgid'])
537 poolid = int(pgid.split('.')[0])
538 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
539 if len(sizes) == 0:
540 return
541 n = sizes[0]
542 osds = self.in_osds + self.out_osds
543 shuffle(osds)
544 osds = osds[0:n]
545 self.log('Setting %s to %s' % (pgid, osds))
546 cmd = ['osd', 'pg-upmap', pgid] + [str(x) for x in osds]
547 self.log('cmd %s' % cmd)
548 self.ceph_manager.raw_cluster_cmd(*cmd)
549 else:
550 m = j['pg_upmap']
551 if len(m) > 0:
552 shuffle(m)
553 pg = m[0]['pgid']
554 self.log('Clearing pg_upmap on %s' % pg)
555 self.ceph_manager.raw_cluster_cmd(
556 'osd',
557 'rm-pg-upmap',
558 pg)
559 else:
560 self.log('No pg_upmap entries; doing nothing')
561 except CommandFailedError:
562 self.log('Failed to rm-pg-upmap, ignoring')
563
564 def thrash_pg_upmap_items(self):
565 """
566 Install or remove random pg_upmap_items entries in OSDMap
567 """
568 from random import shuffle
569 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
570 j = json.loads(out)
571 self.log('j is %s' % j)
572 try:
573 if random.random() >= .3:
574 pgs = self.ceph_manager.get_pg_stats()
575 pg = random.choice(pgs)
576 pgid = str(pg['pgid'])
577 poolid = int(pgid.split('.')[0])
578 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
579 if len(sizes) == 0:
580 return
581 n = sizes[0]
582 osds = self.in_osds + self.out_osds
583 shuffle(osds)
584 osds = osds[0:n*2]
585 self.log('Setting %s to %s' % (pgid, osds))
586 cmd = ['osd', 'pg-upmap-items', pgid] + [str(x) for x in osds]
587 self.log('cmd %s' % cmd)
588 self.ceph_manager.raw_cluster_cmd(*cmd)
589 else:
590 m = j['pg_upmap_items']
591 if len(m) > 0:
592 shuffle(m)
593 pg = m[0]['pgid']
594 self.log('Clearing pg_upmap on %s' % pg)
595 self.ceph_manager.raw_cluster_cmd(
596 'osd',
597 'rm-pg-upmap-items',
598 pg)
599 else:
600 self.log('No pg_upmap entries; doing nothing')
601 except CommandFailedError:
602 self.log('Failed to rm-pg-upmap-items, ignoring')
603
604 def all_up(self):
605 """
606 Make sure all osds are up and not out.
607 """
608 while len(self.dead_osds) > 0:
609 self.log("reviving osd")
610 self.revive_osd()
611 while len(self.out_osds) > 0:
612 self.log("inning osd")
613 self.in_osd()
614
615 def do_join(self):
616 """
617 Break out of this Ceph loop
618 """
619 self.stopping = True
620 self.thread.get()
621 if self.sighup_delay:
622 self.log("joining the do_sighup greenlet")
623 self.sighup_thread.get()
624 if self.optrack_toggle_delay:
625 self.log("joining the do_optrack_toggle greenlet")
626 self.optrack_toggle_thread.join()
627 if self.dump_ops_enable == "true":
628 self.log("joining the do_dump_ops greenlet")
629 self.dump_ops_thread.join()
630 if self.noscrub_toggle_delay:
631 self.log("joining the do_noscrub_toggle greenlet")
632 self.noscrub_toggle_thread.join()
633
634 def grow_pool(self):
635 """
636 Increase the size of the pool
637 """
638 pool = self.ceph_manager.get_pool()
639 orig_pg_num = self.ceph_manager.get_pool_pg_num(pool)
640 self.log("Growing pool %s" % (pool,))
641 if self.ceph_manager.expand_pool(pool,
642 self.config.get('pool_grow_by', 10),
643 self.max_pgs):
644 self.pools_to_fix_pgp_num.add(pool)
645
646 def fix_pgp_num(self, pool=None):
647 """
648 Fix number of pgs in pool.
649 """
650 if pool is None:
651 pool = self.ceph_manager.get_pool()
652 force = False
653 else:
654 force = True
655 self.log("fixing pg num pool %s" % (pool,))
656 if self.ceph_manager.set_pool_pgpnum(pool, force):
657 self.pools_to_fix_pgp_num.discard(pool)
658
659 def test_pool_min_size(self):
660 """
661 Kill and revive all osds except one.
662 """
663 self.log("test_pool_min_size")
664 self.all_up()
665 self.ceph_manager.wait_for_recovery(
666 timeout=self.config.get('timeout')
667 )
668 the_one = random.choice(self.in_osds)
669 self.log("Killing everyone but %s", the_one)
670 to_kill = filter(lambda x: x != the_one, self.in_osds)
671 [self.kill_osd(i) for i in to_kill]
672 [self.out_osd(i) for i in to_kill]
673 time.sleep(self.config.get("test_pool_min_size_time", 10))
674 self.log("Killing %s" % (the_one,))
675 self.kill_osd(the_one)
676 self.out_osd(the_one)
677 self.log("Reviving everyone but %s" % (the_one,))
678 [self.revive_osd(i) for i in to_kill]
679 [self.in_osd(i) for i in to_kill]
680 self.log("Revived everyone but %s" % (the_one,))
681 self.log("Waiting for clean")
682 self.ceph_manager.wait_for_recovery(
683 timeout=self.config.get('timeout')
684 )
685
686 def inject_pause(self, conf_key, duration, check_after, should_be_down):
687 """
688 Pause injection testing. Check for osd being down when finished.
689 """
690 the_one = random.choice(self.live_osds)
691 self.log("inject_pause on {osd}".format(osd=the_one))
692 self.log(
693 "Testing {key} pause injection for duration {duration}".format(
694 key=conf_key,
695 duration=duration
696 ))
697 self.log(
698 "Checking after {after}, should_be_down={shouldbedown}".format(
699 after=check_after,
700 shouldbedown=should_be_down
701 ))
702 self.ceph_manager.set_config(the_one, **{conf_key: duration})
703 if not should_be_down:
704 return
705 time.sleep(check_after)
706 status = self.ceph_manager.get_osd_status()
707 assert the_one in status['down']
708 time.sleep(duration - check_after + 20)
709 status = self.ceph_manager.get_osd_status()
710 assert not the_one in status['down']
711
712 def test_backfill_full(self):
713 """
714 Test backfills stopping when the replica fills up.
715
716 First, use injectfull admin command to simulate a now full
717 osd by setting it to 0 on all of the OSDs.
718
719 Second, on a random subset, set
720 osd_debug_skip_full_check_in_backfill_reservation to force
721 the more complicated check in do_scan to be exercised.
722
723 Then, verify that all backfills stop.
724 """
725 self.log("injecting backfill full")
726 for i in self.live_osds:
727 self.ceph_manager.set_config(
728 i,
729 osd_debug_skip_full_check_in_backfill_reservation=
730 random.choice(['false', 'true']))
731 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'backfillfull'],
732 check_status=True, timeout=30, stdout=DEVNULL)
733 for i in range(30):
734 status = self.ceph_manager.compile_pg_status()
735 if 'backfill' not in status.keys():
736 break
737 self.log(
738 "waiting for {still_going} backfills".format(
739 still_going=status.get('backfill')))
740 time.sleep(1)
741 assert('backfill' not in self.ceph_manager.compile_pg_status().keys())
742 for i in self.live_osds:
743 self.ceph_manager.set_config(
744 i,
745 osd_debug_skip_full_check_in_backfill_reservation='false')
746 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'none'],
747 check_status=True, timeout=30, stdout=DEVNULL)
748
749 def test_map_discontinuity(self):
750 """
751 1) Allows the osds to recover
752 2) kills an osd
753 3) allows the remaining osds to recover
754 4) waits for some time
755 5) revives the osd
756 This sequence should cause the revived osd to have to handle
757 a map gap since the mons would have trimmed
758 """
759 while len(self.in_osds) < (self.minin + 1):
760 self.in_osd()
761 self.log("Waiting for recovery")
762 self.ceph_manager.wait_for_all_up(
763 timeout=self.config.get('timeout')
764 )
765 # now we wait 20s for the pg status to change, if it takes longer,
766 # the test *should* fail!
767 time.sleep(20)
768 self.ceph_manager.wait_for_clean(
769 timeout=self.config.get('timeout')
770 )
771
772 # now we wait 20s for the backfill replicas to hear about the clean
773 time.sleep(20)
774 self.log("Recovered, killing an osd")
775 self.kill_osd(mark_down=True, mark_out=True)
776 self.log("Waiting for clean again")
777 self.ceph_manager.wait_for_clean(
778 timeout=self.config.get('timeout')
779 )
780 self.log("Waiting for trim")
781 time.sleep(int(self.config.get("map_discontinuity_sleep_time", 40)))
782 self.revive_osd()
783
784 def choose_action(self):
785 """
786 Random action selector.
787 """
788 chance_down = self.config.get('chance_down', 0.4)
789 chance_test_min_size = self.config.get('chance_test_min_size', 0)
790 chance_test_backfill_full = \
791 self.config.get('chance_test_backfill_full', 0)
792 if isinstance(chance_down, int):
793 chance_down = float(chance_down) / 100
794 minin = self.minin
795 minout = self.config.get("min_out", 0)
796 minlive = self.config.get("min_live", 2)
797 mindead = self.config.get("min_dead", 0)
798
799 self.log('choose_action: min_in %d min_out '
800 '%d min_live %d min_dead %d' %
801 (minin, minout, minlive, mindead))
802 actions = []
803 if len(self.in_osds) > minin:
804 actions.append((self.out_osd, 1.0,))
805 if len(self.live_osds) > minlive and chance_down > 0:
806 actions.append((self.kill_osd, chance_down,))
807 if len(self.dead_osds) > 1:
808 actions.append((self.rm_past_intervals, 1.0,))
809 if len(self.out_osds) > minout:
810 actions.append((self.in_osd, 1.7,))
811 if len(self.dead_osds) > mindead:
812 actions.append((self.revive_osd, 1.0,))
813 if self.config.get('thrash_primary_affinity', True):
814 actions.append((self.primary_affinity, 1.0,))
815 actions.append((self.reweight_osd_or_by_util,
816 self.config.get('reweight_osd', .5),))
817 actions.append((self.grow_pool,
818 self.config.get('chance_pgnum_grow', 0),))
819 actions.append((self.fix_pgp_num,
820 self.config.get('chance_pgpnum_fix', 0),))
821 actions.append((self.test_pool_min_size,
822 chance_test_min_size,))
823 actions.append((self.test_backfill_full,
824 chance_test_backfill_full,))
825 if self.chance_thrash_cluster_full > 0:
826 actions.append((self.thrash_cluster_full, self.chance_thrash_cluster_full,))
827 if self.chance_thrash_pg_upmap > 0:
828 actions.append((self.thrash_pg_upmap, self.chance_thrash_pg_upmap,))
829 if self.chance_thrash_pg_upmap_items > 0:
830 actions.append((self.thrash_pg_upmap_items, self.chance_thrash_pg_upmap_items,))
831
832 for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
833 for scenario in [
834 (lambda:
835 self.inject_pause(key,
836 self.config.get('pause_short', 3),
837 0,
838 False),
839 self.config.get('chance_inject_pause_short', 1),),
840 (lambda:
841 self.inject_pause(key,
842 self.config.get('pause_long', 80),
843 self.config.get('pause_check_after', 70),
844 True),
845 self.config.get('chance_inject_pause_long', 0),)]:
846 actions.append(scenario)
847
848 total = sum([y for (x, y) in actions])
849 val = random.uniform(0, total)
850 for (action, prob) in actions:
851 if val < prob:
852 return action
853 val -= prob
854 return None
855
856 def log_exc(func):
857 @wraps(func)
858 def wrapper(self):
859 try:
860 return func(self)
861 except:
862 self.log(traceback.format_exc())
863 raise
864 return wrapper
865
866 @log_exc
867 def do_sighup(self):
868 """
869 Loops and sends signal.SIGHUP to a random live osd.
870
871 Loop delay is controlled by the config value sighup_delay.
872 """
873 delay = float(self.sighup_delay)
874 self.log("starting do_sighup with a delay of {0}".format(delay))
875 while not self.stopping:
876 osd = random.choice(self.live_osds)
877 self.ceph_manager.signal_osd(osd, signal.SIGHUP, silent=True)
878 time.sleep(delay)
879
880 @log_exc
881 def do_optrack_toggle(self):
882 """
883 Loops and toggle op tracking to all osds.
884
885 Loop delay is controlled by the config value optrack_toggle_delay.
886 """
887 delay = float(self.optrack_toggle_delay)
888 osd_state = "true"
889 self.log("starting do_optrack_toggle with a delay of {0}".format(delay))
890 while not self.stopping:
891 if osd_state == "true":
892 osd_state = "false"
893 else:
894 osd_state = "true"
895 self.ceph_manager.raw_cluster_cmd_result('tell', 'osd.*',
896 'injectargs', '--osd_enable_op_tracker=%s' % osd_state)
897 gevent.sleep(delay)
898
899 @log_exc
900 def do_dump_ops(self):
901 """
902 Loops and does op dumps on all osds
903 """
904 self.log("starting do_dump_ops")
905 while not self.stopping:
906 for osd in self.live_osds:
907 # Ignore errors because live_osds is in flux
908 self.ceph_manager.osd_admin_socket(osd, command=['dump_ops_in_flight'],
909 check_status=False, timeout=30, stdout=DEVNULL)
910 self.ceph_manager.osd_admin_socket(osd, command=['dump_blocked_ops'],
911 check_status=False, timeout=30, stdout=DEVNULL)
912 self.ceph_manager.osd_admin_socket(osd, command=['dump_historic_ops'],
913 check_status=False, timeout=30, stdout=DEVNULL)
914 gevent.sleep(0)
915
916 @log_exc
917 def do_noscrub_toggle(self):
918 """
919 Loops and toggle noscrub flags
920
921 Loop delay is controlled by the config value noscrub_toggle_delay.
922 """
923 delay = float(self.noscrub_toggle_delay)
924 scrub_state = "none"
925 self.log("starting do_noscrub_toggle with a delay of {0}".format(delay))
926 while not self.stopping:
927 if scrub_state == "none":
928 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'noscrub')
929 scrub_state = "noscrub"
930 elif scrub_state == "noscrub":
931 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
932 scrub_state = "both"
933 elif scrub_state == "both":
934 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
935 scrub_state = "nodeep-scrub"
936 else:
937 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
938 scrub_state = "none"
939 gevent.sleep(delay)
940 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
941 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
942
943 @log_exc
944 def do_thrash(self):
945 """
946 Loop to select random actions to thrash ceph manager with.
947 """
948 cleanint = self.config.get("clean_interval", 60)
949 scrubint = self.config.get("scrub_interval", -1)
950 maxdead = self.config.get("max_dead", 0)
951 delay = self.config.get("op_delay", 5)
952 self.log("starting do_thrash")
953 while not self.stopping:
954 to_log = [str(x) for x in ["in_osds: ", self.in_osds,
955 "out_osds: ", self.out_osds,
956 "dead_osds: ", self.dead_osds,
957 "live_osds: ", self.live_osds]]
958 self.log(" ".join(to_log))
959 if random.uniform(0, 1) < (float(delay) / cleanint):
960 while len(self.dead_osds) > maxdead:
961 self.revive_osd()
962 for osd in self.in_osds:
963 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
964 str(osd), str(1))
965 if random.uniform(0, 1) < float(
966 self.config.get('chance_test_map_discontinuity', 0)):
967 self.test_map_discontinuity()
968 else:
969 self.ceph_manager.wait_for_recovery(
970 timeout=self.config.get('timeout')
971 )
972 time.sleep(self.clean_wait)
973 if scrubint > 0:
974 if random.uniform(0, 1) < (float(delay) / scrubint):
975 self.log('Scrubbing while thrashing being performed')
976 Scrubber(self.ceph_manager, self.config)
977 self.choose_action()()
978 time.sleep(delay)
979 for pool in list(self.pools_to_fix_pgp_num):
980 if self.ceph_manager.get_pool_pg_num(pool) > 0:
981 self.fix_pgp_num(pool)
982 self.pools_to_fix_pgp_num.clear()
983 for service, opt, saved_value in self.saved_options:
984 self._set_config(service, '*', opt, saved_value)
985 self.saved_options = []
986 self.all_up()
987
988
989class ObjectStoreTool:
990
991 def __init__(self, manager, pool, **kwargs):
992 self.manager = manager
993 self.pool = pool
994 self.osd = kwargs.get('osd', None)
995 self.object_name = kwargs.get('object_name', None)
996 self.do_revive = kwargs.get('do_revive', True)
997 if self.osd and self.pool and self.object_name:
998 if self.osd == "primary":
999 self.osd = self.manager.get_object_primary(self.pool,
1000 self.object_name)
1001 assert self.osd
1002 if self.object_name:
1003 self.pgid = self.manager.get_object_pg_with_shard(self.pool,
1004 self.object_name,
1005 self.osd)
1006 self.remote = self.manager.ctx.\
1007 cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()[0]
1008 path = self.manager.get_filepath().format(id=self.osd)
1009 self.paths = ("--data-path {path} --journal-path {path}/journal".
1010 format(path=path))
1011
1012 def build_cmd(self, options, args, stdin):
1013 lines = []
1014 if self.object_name:
1015 lines.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1016 "{paths} --pgid {pgid} --op list |"
1017 "grep '\"oid\":\"{name}\"')".
1018 format(paths=self.paths,
1019 pgid=self.pgid,
1020 name=self.object_name))
1021 args = '"$object" ' + args
1022 options += " --pgid {pgid}".format(pgid=self.pgid)
1023 cmd = ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1024 format(paths=self.paths,
1025 args=args,
1026 options=options))
1027 if stdin:
1028 cmd = ("echo {payload} | base64 --decode | {cmd}".
1029 format(payload=base64.encode(stdin),
1030 cmd=cmd))
1031 lines.append(cmd)
1032 return "\n".join(lines)
1033
1034 def run(self, options, args, stdin=None, stdout=None):
1035 if stdout is None:
1036 stdout = StringIO()
1037 self.manager.kill_osd(self.osd)
1038 cmd = self.build_cmd(options, args, stdin)
1039 self.manager.log(cmd)
1040 try:
1041 proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
1042 check_status=False,
1043 stdout=stdout,
1044 stderr=StringIO())
1045 proc.wait()
1046 if proc.exitstatus != 0:
1047 self.manager.log("failed with " + str(proc.exitstatus))
1048 error = proc.stdout.getvalue() + " " + proc.stderr.getvalue()
1049 raise Exception(error)
1050 finally:
1051 if self.do_revive:
1052 self.manager.revive_osd(self.osd)
1053
1054
1055class CephManager:
1056 """
1057 Ceph manager object.
1058 Contains several local functions that form a bulk of this module.
1059
1060 Note: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1061 the same name.
1062 """
1063
1064 REPLICATED_POOL = 1
1065 ERASURE_CODED_POOL = 3
1066
1067 def __init__(self, controller, ctx=None, config=None, logger=None,
1068 cluster='ceph'):
1069 self.lock = threading.RLock()
1070 self.ctx = ctx
1071 self.config = config
1072 self.controller = controller
1073 self.next_pool_id = 0
1074 self.cluster = cluster
1075 if (logger):
1076 self.log = lambda x: logger.info(x)
1077 else:
1078 def tmp(x):
1079 """
1080 implement log behavior.
1081 """
1082 print x
1083 self.log = tmp
1084 if self.config is None:
1085 self.config = dict()
1086 pools = self.list_pools()
1087 self.pools = {}
1088 for pool in pools:
1089 # we may race with a pool deletion; ignore failures here
1090 try:
1091 self.pools[pool] = self.get_pool_property(pool, 'pg_num')
1092 except CommandFailedError:
1093 self.log('Failed to get pg_num from pool %s, ignoring' % pool)
1094
1095 def raw_cluster_cmd(self, *args):
1096 """
1097 Start ceph on a raw cluster. Return count
1098 """
1099 testdir = teuthology.get_testdir(self.ctx)
1100 ceph_args = [
1101 'sudo',
1102 'adjust-ulimits',
1103 'ceph-coverage',
1104 '{tdir}/archive/coverage'.format(tdir=testdir),
1105 'timeout',
1106 '120',
1107 'ceph',
1108 '--cluster',
1109 self.cluster,
1110 ]
1111 ceph_args.extend(args)
1112 proc = self.controller.run(
1113 args=ceph_args,
1114 stdout=StringIO(),
1115 )
1116 return proc.stdout.getvalue()
1117
1118 def raw_cluster_cmd_result(self, *args):
1119 """
1120 Start ceph on a cluster. Return success or failure information.
1121 """
1122 testdir = teuthology.get_testdir(self.ctx)
1123 ceph_args = [
1124 'sudo',
1125 'adjust-ulimits',
1126 'ceph-coverage',
1127 '{tdir}/archive/coverage'.format(tdir=testdir),
1128 'timeout',
1129 '120',
1130 'ceph',
1131 '--cluster',
1132 self.cluster,
1133 ]
1134 ceph_args.extend(args)
1135 proc = self.controller.run(
1136 args=ceph_args,
1137 check_status=False,
1138 )
1139 return proc.exitstatus
1140
1141 def run_ceph_w(self):
1142 """
1143 Execute "ceph -w" in the background with stdout connected to a StringIO,
1144 and return the RemoteProcess.
1145 """
1146 return self.controller.run(
1147 args=["sudo",
1148 "daemon-helper",
1149 "kill",
1150 "ceph",
1151 '--cluster',
1152 self.cluster,
1153 "-w"],
1154 wait=False, stdout=StringIO(), stdin=run.PIPE)
1155
1156 def do_rados(self, remote, cmd, check_status=True):
1157 """
1158 Execute a remote rados command.
1159 """
1160 testdir = teuthology.get_testdir(self.ctx)
1161 pre = [
1162 'adjust-ulimits',
1163 'ceph-coverage',
1164 '{tdir}/archive/coverage'.format(tdir=testdir),
1165 'rados',
1166 '--cluster',
1167 self.cluster,
1168 ]
1169 pre.extend(cmd)
1170 proc = remote.run(
1171 args=pre,
1172 wait=True,
1173 check_status=check_status
1174 )
1175 return proc
1176
1177 def rados_write_objects(self, pool, num_objects, size,
1178 timelimit, threads, cleanup=False):
1179 """
1180 Write rados objects
1181 Threads not used yet.
1182 """
1183 args = [
1184 '-p', pool,
1185 '--num-objects', num_objects,
1186 '-b', size,
1187 'bench', timelimit,
1188 'write'
1189 ]
1190 if not cleanup:
1191 args.append('--no-cleanup')
1192 return self.do_rados(self.controller, map(str, args))
1193
1194 def do_put(self, pool, obj, fname, namespace=None):
1195 """
1196 Implement rados put operation
1197 """
1198 args = ['-p', pool]
1199 if namespace is not None:
1200 args += ['-N', namespace]
1201 args += [
1202 'put',
1203 obj,
1204 fname
1205 ]
1206 return self.do_rados(
1207 self.controller,
1208 args,
1209 check_status=False
1210 ).exitstatus
1211
1212 def do_get(self, pool, obj, fname='/dev/null', namespace=None):
1213 """
1214 Implement rados get operation
1215 """
1216 args = ['-p', pool]
1217 if namespace is not None:
1218 args += ['-N', namespace]
1219 args += [
1220 'get',
1221 obj,
1222 fname
1223 ]
1224 return self.do_rados(
1225 self.controller,
1226 args,
1227 check_status=False
1228 ).exitstatus
1229
1230 def do_rm(self, pool, obj, namespace=None):
1231 """
1232 Implement rados rm operation
1233 """
1234 args = ['-p', pool]
1235 if namespace is not None:
1236 args += ['-N', namespace]
1237 args += [
1238 'rm',
1239 obj
1240 ]
1241 return self.do_rados(
1242 self.controller,
1243 args,
1244 check_status=False
1245 ).exitstatus
1246
1247 def osd_admin_socket(self, osd_id, command, check_status=True, timeout=0, stdout=None):
1248 if stdout is None:
1249 stdout = StringIO()
1250 return self.admin_socket('osd', osd_id, command, check_status, timeout, stdout)
1251
1252 def find_remote(self, service_type, service_id):
1253 """
1254 Get the Remote for the host where a particular service runs.
1255
1256 :param service_type: 'mds', 'osd', 'client'
1257 :param service_id: The second part of a role, e.g. '0' for
1258 the role 'client.0'
1259 :return: a Remote instance for the host where the
1260 requested role is placed
1261 """
1262 return get_remote(self.ctx, self.cluster,
1263 service_type, service_id)
1264
1265 def admin_socket(self, service_type, service_id,
1266 command, check_status=True, timeout=0, stdout=None):
1267 """
1268 Remotely start up ceph specifying the admin socket
1269 :param command: a list of words to use as the command
1270 to the admin socket
1271 """
1272 if stdout is None:
1273 stdout = StringIO()
1274 testdir = teuthology.get_testdir(self.ctx)
1275 remote = self.find_remote(service_type, service_id)
1276 args = [
1277 'sudo',
1278 'adjust-ulimits',
1279 'ceph-coverage',
1280 '{tdir}/archive/coverage'.format(tdir=testdir),
1281 'timeout',
1282 str(timeout),
1283 'ceph',
1284 '--cluster',
1285 self.cluster,
1286 '--admin-daemon',
1287 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1288 cluster=self.cluster,
1289 type=service_type,
1290 id=service_id),
1291 ]
1292 args.extend(command)
1293 return remote.run(
1294 args=args,
1295 stdout=stdout,
1296 wait=True,
1297 check_status=check_status
1298 )
1299
1300 def objectstore_tool(self, pool, options, args, **kwargs):
1301 return ObjectStoreTool(self, pool, **kwargs).run(options, args)
1302
1303 def get_pgid(self, pool, pgnum):
1304 """
1305 :param pool: pool name
1306 :param pgnum: pg number
1307 :returns: a string representing this pg.
1308 """
1309 poolnum = self.get_pool_num(pool)
1310 pg_str = "{poolnum}.{pgnum}".format(
1311 poolnum=poolnum,
1312 pgnum=pgnum)
1313 return pg_str
1314
1315 def get_pg_replica(self, pool, pgnum):
1316 """
1317 get replica for pool, pgnum (e.g. (data, 0)->0
1318 """
1319 pg_str = self.get_pgid(pool, pgnum)
1320 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1321 j = json.loads('\n'.join(output.split('\n')[1:]))
1322 return int(j['acting'][-1])
1323 assert False
1324
1325 def wait_for_pg_stats(func):
1326 # both osd_mon_report_interval_min and mgr_stats_period are 5 seconds
1327 # by default, and take the faulty injection in ms into consideration,
1328 # 12 seconds are more than enough
1329 delays = [1, 1, 2, 3, 5, 8, 13]
1330 @wraps(func)
1331 def wrapper(self, *args, **kwargs):
1332 exc = None
1333 for delay in delays:
1334 try:
1335 return func(self, *args, **kwargs)
1336 except AssertionError as e:
1337 time.sleep(delay)
1338 exc = e
1339 raise exc
1340 return wrapper
1341
1342 def get_pg_primary(self, pool, pgnum):
1343 """
1344 get primary for pool, pgnum (e.g. (data, 0)->0
1345 """
1346 pg_str = self.get_pgid(pool, pgnum)
1347 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1348 j = json.loads('\n'.join(output.split('\n')[1:]))
1349 return int(j['acting'][0])
1350 assert False
1351
1352 def get_pool_num(self, pool):
1353 """
1354 get number for pool (e.g., data -> 2)
1355 """
1356 return int(self.get_pool_dump(pool)['pool'])
1357
1358 def list_pools(self):
1359 """
1360 list all pool names
1361 """
1362 osd_dump = self.get_osd_dump_json()
1363 self.log(osd_dump['pools'])
1364 return [str(i['pool_name']) for i in osd_dump['pools']]
1365
1366 def clear_pools(self):
1367 """
1368 remove all pools
1369 """
1370 [self.remove_pool(i) for i in self.list_pools()]
1371
1372 def kick_recovery_wq(self, osdnum):
1373 """
1374 Run kick_recovery_wq on cluster.
1375 """
1376 return self.raw_cluster_cmd(
1377 'tell', "osd.%d" % (int(osdnum),),
1378 'debug',
1379 'kick_recovery_wq',
1380 '0')
1381
1382 def wait_run_admin_socket(self, service_type,
1383 service_id, args=['version'], timeout=75, stdout=None):
1384 """
1385 If osd_admin_socket call suceeds, return. Otherwise wait
1386 five seconds and try again.
1387 """
1388 if stdout is None:
1389 stdout = StringIO()
1390 tries = 0
1391 while True:
1392 proc = self.admin_socket(service_type, service_id,
1393 args, check_status=False, stdout=stdout)
1394 if proc.exitstatus is 0:
1395 return proc
1396 else:
1397 tries += 1
1398 if (tries * 5) > timeout:
1399 raise Exception('timed out waiting for admin_socket '
1400 'to appear after {type}.{id} restart'.
1401 format(type=service_type,
1402 id=service_id))
1403 self.log("waiting on admin_socket for {type}-{id}, "
1404 "{command}".format(type=service_type,
1405 id=service_id,
1406 command=args))
1407 time.sleep(5)
1408
1409 def get_pool_dump(self, pool):
1410 """
1411 get the osd dump part of a pool
1412 """
1413 osd_dump = self.get_osd_dump_json()
1414 for i in osd_dump['pools']:
1415 if i['pool_name'] == pool:
1416 return i
1417 assert False
1418
1419 def get_config(self, service_type, service_id, name):
1420 """
1421 :param node: like 'mon.a'
1422 :param name: the option name
1423 """
1424 proc = self.wait_run_admin_socket(service_type, service_id,
1425 ['config', 'show'])
1426 j = json.loads(proc.stdout.getvalue())
1427 return j[name]
1428
1429 def set_config(self, osdnum, **argdict):
1430 """
1431 :param osdnum: osd number
1432 :param argdict: dictionary containing values to set.
1433 """
1434 for k, v in argdict.iteritems():
1435 self.wait_run_admin_socket(
1436 'osd', osdnum,
1437 ['config', 'set', str(k), str(v)])
1438
1439 def raw_cluster_status(self):
1440 """
1441 Get status from cluster
1442 """
1443 status = self.raw_cluster_cmd('status', '--format=json-pretty')
1444 return json.loads(status)
1445
1446 def raw_osd_status(self):
1447 """
1448 Get osd status from cluster
1449 """
1450 return self.raw_cluster_cmd('osd', 'dump')
1451
1452 def get_osd_status(self):
1453 """
1454 Get osd statuses sorted by states that the osds are in.
1455 """
1456 osd_lines = filter(
1457 lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
1458 self.raw_osd_status().split('\n'))
1459 self.log(osd_lines)
1460 in_osds = [int(i[4:].split()[0])
1461 for i in filter(lambda x: " in " in x, osd_lines)]
1462 out_osds = [int(i[4:].split()[0])
1463 for i in filter(lambda x: " out " in x, osd_lines)]
1464 up_osds = [int(i[4:].split()[0])
1465 for i in filter(lambda x: " up " in x, osd_lines)]
1466 down_osds = [int(i[4:].split()[0])
1467 for i in filter(lambda x: " down " in x, osd_lines)]
1468 dead_osds = [int(x.id_)
1469 for x in filter(lambda x:
1470 not x.running(),
1471 self.ctx.daemons.
1472 iter_daemons_of_role('osd', self.cluster))]
1473 live_osds = [int(x.id_) for x in
1474 filter(lambda x:
1475 x.running(),
1476 self.ctx.daemons.iter_daemons_of_role('osd',
1477 self.cluster))]
1478 return {'in': in_osds, 'out': out_osds, 'up': up_osds,
1479 'down': down_osds, 'dead': dead_osds, 'live': live_osds,
1480 'raw': osd_lines}
1481
1482 def get_num_pgs(self):
1483 """
1484 Check cluster status for the number of pgs
1485 """
1486 status = self.raw_cluster_status()
1487 self.log(status)
1488 return status['pgmap']['num_pgs']
1489
1490 def create_erasure_code_profile(self, profile_name, profile):
1491 """
1492 Create an erasure code profile name that can be used as a parameter
1493 when creating an erasure coded pool.
1494 """
1495 with self.lock:
1496 args = cmd_erasure_code_profile(profile_name, profile)
1497 self.raw_cluster_cmd(*args)
1498
1499 def create_pool_with_unique_name(self, pg_num=16,
1500 erasure_code_profile_name=None,
1501 min_size=None,
1502 erasure_code_use_overwrites=False):
1503 """
1504 Create a pool named unique_pool_X where X is unique.
1505 """
1506 name = ""
1507 with self.lock:
1508 name = "unique_pool_%s" % (str(self.next_pool_id),)
1509 self.next_pool_id += 1
1510 self.create_pool(
1511 name,
1512 pg_num,
1513 erasure_code_profile_name=erasure_code_profile_name,
1514 min_size=min_size,
1515 erasure_code_use_overwrites=erasure_code_use_overwrites)
1516 return name
1517
1518 @contextlib.contextmanager
1519 def pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
1520 self.create_pool(pool_name, pg_num, erasure_code_profile_name)
1521 yield
1522 self.remove_pool(pool_name)
1523
1524 def create_pool(self, pool_name, pg_num=16,
1525 erasure_code_profile_name=None,
1526 min_size=None,
1527 erasure_code_use_overwrites=False):
1528 """
1529 Create a pool named from the pool_name parameter.
1530 :param pool_name: name of the pool being created.
1531 :param pg_num: initial number of pgs.
1532 :param erasure_code_profile_name: if set and !None create an
1533 erasure coded pool using the profile
1534 :param erasure_code_use_overwrites: if true, allow overwrites
1535 """
1536 with self.lock:
1537 assert isinstance(pool_name, basestring)
1538 assert isinstance(pg_num, int)
1539 assert pool_name not in self.pools
1540 self.log("creating pool_name %s" % (pool_name,))
1541 if erasure_code_profile_name:
1542 self.raw_cluster_cmd('osd', 'pool', 'create',
1543 pool_name, str(pg_num), str(pg_num),
1544 'erasure', erasure_code_profile_name)
1545 else:
1546 self.raw_cluster_cmd('osd', 'pool', 'create',
1547 pool_name, str(pg_num))
1548 if min_size is not None:
1549 self.raw_cluster_cmd(
1550 'osd', 'pool', 'set', pool_name,
1551 'min_size',
1552 str(min_size))
1553 if erasure_code_use_overwrites:
1554 self.raw_cluster_cmd(
1555 'osd', 'pool', 'set', pool_name,
1556 'allow_ec_overwrites',
1557 'true')
1558 self.pools[pool_name] = pg_num
1559 time.sleep(1)
1560
1561 def add_pool_snap(self, pool_name, snap_name):
1562 """
1563 Add pool snapshot
1564 :param pool_name: name of pool to snapshot
1565 :param snap_name: name of snapshot to take
1566 """
1567 self.raw_cluster_cmd('osd', 'pool', 'mksnap',
1568 str(pool_name), str(snap_name))
1569
1570 def remove_pool_snap(self, pool_name, snap_name):
1571 """
1572 Remove pool snapshot
1573 :param pool_name: name of pool to snapshot
1574 :param snap_name: name of snapshot to remove
1575 """
1576 self.raw_cluster_cmd('osd', 'pool', 'rmsnap',
1577 str(pool_name), str(snap_name))
1578
1579 def remove_pool(self, pool_name):
1580 """
1581 Remove the indicated pool
1582 :param pool_name: Pool to be removed
1583 """
1584 with self.lock:
1585 assert isinstance(pool_name, basestring)
1586 assert pool_name in self.pools
1587 self.log("removing pool_name %s" % (pool_name,))
1588 del self.pools[pool_name]
1589 self.do_rados(self.controller,
1590 ['rmpool', pool_name, pool_name,
1591 "--yes-i-really-really-mean-it"])
1592
1593 def get_pool(self):
1594 """
1595 Pick a random pool
1596 """
1597 with self.lock:
1598 return random.choice(self.pools.keys())
1599
1600 def get_pool_pg_num(self, pool_name):
1601 """
1602 Return the number of pgs in the pool specified.
1603 """
1604 with self.lock:
1605 assert isinstance(pool_name, basestring)
1606 if pool_name in self.pools:
1607 return self.pools[pool_name]
1608 return 0
1609
1610 def get_pool_property(self, pool_name, prop):
1611 """
1612 :param pool_name: pool
1613 :param prop: property to be checked.
1614 :returns: property as an int value.
1615 """
1616 with self.lock:
1617 assert isinstance(pool_name, basestring)
1618 assert isinstance(prop, basestring)
1619 output = self.raw_cluster_cmd(
1620 'osd',
1621 'pool',
1622 'get',
1623 pool_name,
1624 prop)
1625 return int(output.split()[1])
1626
1627 def set_pool_property(self, pool_name, prop, val):
1628 """
1629 :param pool_name: pool
1630 :param prop: property to be set.
1631 :param val: value to set.
1632
1633 This routine retries if set operation fails.
1634 """
1635 with self.lock:
1636 assert isinstance(pool_name, basestring)
1637 assert isinstance(prop, basestring)
1638 assert isinstance(val, int)
1639 tries = 0
1640 while True:
1641 r = self.raw_cluster_cmd_result(
1642 'osd',
1643 'pool',
1644 'set',
1645 pool_name,
1646 prop,
1647 str(val))
1648 if r != 11: # EAGAIN
1649 break
1650 tries += 1
1651 if tries > 50:
1652 raise Exception('timed out getting EAGAIN '
1653 'when setting pool property %s %s = %s' %
1654 (pool_name, prop, val))
1655 self.log('got EAGAIN setting pool property, '
1656 'waiting a few seconds...')
1657 time.sleep(2)
1658
1659 def expand_pool(self, pool_name, by, max_pgs):
1660 """
1661 Increase the number of pgs in a pool
1662 """
1663 with self.lock:
1664 assert isinstance(pool_name, basestring)
1665 assert isinstance(by, int)
1666 assert pool_name in self.pools
1667 if self.get_num_creating() > 0:
1668 return False
1669 if (self.pools[pool_name] + by) > max_pgs:
1670 return False
1671 self.log("increase pool size by %d" % (by,))
1672 new_pg_num = self.pools[pool_name] + by
1673 self.set_pool_property(pool_name, "pg_num", new_pg_num)
1674 self.pools[pool_name] = new_pg_num
1675 return True
1676
1677 def set_pool_pgpnum(self, pool_name, force):
1678 """
1679 Set pgpnum property of pool_name pool.
1680 """
1681 with self.lock:
1682 assert isinstance(pool_name, basestring)
1683 assert pool_name in self.pools
1684 if not force and self.get_num_creating() > 0:
1685 return False
1686 self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
1687 return True
1688
1689 def list_pg_missing(self, pgid):
1690 """
1691 return list of missing pgs with the id specified
1692 """
1693 r = None
1694 offset = {}
1695 while True:
1696 out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_missing',
1697 json.dumps(offset))
1698 j = json.loads(out)
1699 if r is None:
1700 r = j
1701 else:
1702 r['objects'].extend(j['objects'])
1703 if not 'more' in j:
1704 break
1705 if j['more'] == 0:
1706 break
1707 offset = j['objects'][-1]['oid']
1708 if 'more' in r:
1709 del r['more']
1710 return r
1711
1712 def get_pg_stats(self):
1713 """
1714 Dump the cluster and get pg stats
1715 """
1716 out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
1717 j = json.loads('\n'.join(out.split('\n')[1:]))
1718 return j['pg_stats']
1719
1720 def compile_pg_status(self):
1721 """
1722 Return a histogram of pg state values
1723 """
1724 ret = {}
1725 j = self.get_pg_stats()
1726 for pg in j:
1727 for status in pg['state'].split('+'):
1728 if status not in ret:
1729 ret[status] = 0
1730 ret[status] += 1
1731 return ret
1732
1733 @wait_for_pg_stats
1734 def with_pg_state(self, pool, pgnum, check):
1735 pgstr = self.get_pgid(pool, pgnum)
1736 stats = self.get_single_pg_stats(pgstr)
1737 assert(check(stats['state']))
1738
1739 @wait_for_pg_stats
1740 def with_pg(self, pool, pgnum, check):
1741 pgstr = self.get_pgid(pool, pgnum)
1742 stats = self.get_single_pg_stats(pgstr)
1743 return check(stats)
1744
1745 def get_last_scrub_stamp(self, pool, pgnum):
1746 """
1747 Get the timestamp of the last scrub.
1748 """
1749 stats = self.get_single_pg_stats(self.get_pgid(pool, pgnum))
1750 return stats["last_scrub_stamp"]
1751
1752 def do_pg_scrub(self, pool, pgnum, stype):
1753 """
1754 Scrub pg and wait for scrubbing to finish
1755 """
1756 init = self.get_last_scrub_stamp(pool, pgnum)
1757 RESEND_TIMEOUT = 120 # Must be a multiple of SLEEP_TIME
1758 FATAL_TIMEOUT = RESEND_TIMEOUT * 3
1759 SLEEP_TIME = 10
1760 timer = 0
1761 while init == self.get_last_scrub_stamp(pool, pgnum):
1762 assert timer < FATAL_TIMEOUT, "fatal timeout trying to " + stype
1763 self.log("waiting for scrub type %s" % (stype,))
1764 if (timer % RESEND_TIMEOUT) == 0:
1765 self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
1766 # The first time in this loop is the actual request
1767 if timer != 0 and stype == "repair":
1768 self.log("WARNING: Resubmitted a non-idempotent repair")
1769 time.sleep(SLEEP_TIME)
1770 timer += SLEEP_TIME
1771
1772 def wait_snap_trimming_complete(self, pool):
1773 """
1774 Wait for snap trimming on pool to end
1775 """
1776 POLL_PERIOD = 10
1777 FATAL_TIMEOUT = 600
1778 start = time.time()
1779 poolnum = self.get_pool_num(pool)
1780 poolnumstr = "%s." % (poolnum,)
1781 while (True):
1782 now = time.time()
1783 if (now - start) > FATAL_TIMEOUT:
1784 assert (now - start) < FATAL_TIMEOUT, \
1785 'failed to complete snap trimming before timeout'
1786 all_stats = self.get_pg_stats()
1787 trimming = False
1788 for pg in all_stats:
1789 if (poolnumstr in pg['pgid']) and ('snaptrim' in pg['state']):
1790 self.log("pg {pg} in trimming, state: {state}".format(
1791 pg=pg['pgid'],
1792 state=pg['state']))
1793 trimming = True
1794 if not trimming:
1795 break
1796 self.log("{pool} still trimming, waiting".format(pool=pool))
1797 time.sleep(POLL_PERIOD)
1798
1799 def get_single_pg_stats(self, pgid):
1800 """
1801 Return pg for the pgid specified.
1802 """
1803 all_stats = self.get_pg_stats()
1804
1805 for pg in all_stats:
1806 if pg['pgid'] == pgid:
1807 return pg
1808
1809 return None
1810
1811 def get_object_pg_with_shard(self, pool, name, osdid):
1812 """
1813 """
1814 pool_dump = self.get_pool_dump(pool)
1815 object_map = self.get_object_map(pool, name)
1816 if pool_dump["type"] == CephManager.ERASURE_CODED_POOL:
1817 shard = object_map['acting'].index(osdid)
1818 return "{pgid}s{shard}".format(pgid=object_map['pgid'],
1819 shard=shard)
1820 else:
1821 return object_map['pgid']
1822
1823 def get_object_primary(self, pool, name):
1824 """
1825 """
1826 object_map = self.get_object_map(pool, name)
1827 return object_map['acting_primary']
1828
1829 def get_object_map(self, pool, name):
1830 """
1831 osd map --format=json converted to a python object
1832 :returns: the python object
1833 """
1834 out = self.raw_cluster_cmd('--format=json', 'osd', 'map', pool, name)
1835 return json.loads('\n'.join(out.split('\n')[1:]))
1836
1837 def get_osd_dump_json(self):
1838 """
1839 osd dump --format=json converted to a python object
1840 :returns: the python object
1841 """
1842 out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
1843 return json.loads('\n'.join(out.split('\n')[1:]))
1844
1845 def get_osd_dump(self):
1846 """
1847 Dump osds
1848 :returns: all osds
1849 """
1850 return self.get_osd_dump_json()['osds']
1851
1852 def get_stuck_pgs(self, type_, threshold):
1853 """
1854 :returns: stuck pg information from the cluster
1855 """
1856 out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
1857 '--format=json')
1858 return json.loads(out)
1859
1860 def get_num_unfound_objects(self):
1861 """
1862 Check cluster status to get the number of unfound objects
1863 """
1864 status = self.raw_cluster_status()
1865 self.log(status)
1866 return status['pgmap'].get('unfound_objects', 0)
1867
1868 def get_num_creating(self):
1869 """
1870 Find the number of pgs in creating mode.
1871 """
1872 pgs = self.get_pg_stats()
1873 num = 0
1874 for pg in pgs:
1875 if 'creating' in pg['state']:
1876 num += 1
1877 return num
1878
1879 def get_num_active_clean(self):
1880 """
1881 Find the number of active and clean pgs.
1882 """
1883 pgs = self.get_pg_stats()
1884 num = 0
1885 for pg in pgs:
1886 if (pg['state'].count('active') and
1887 pg['state'].count('clean') and
1888 not pg['state'].count('stale')):
1889 num += 1
1890 return num
1891
1892 def get_num_active_recovered(self):
1893 """
1894 Find the number of active and recovered pgs.
1895 """
1896 pgs = self.get_pg_stats()
1897 num = 0
1898 for pg in pgs:
1899 if (pg['state'].count('active') and
1900 not pg['state'].count('recover') and
1901 not pg['state'].count('backfill') and
1902 not pg['state'].count('stale')):
1903 num += 1
1904 return num
1905
1906 def get_is_making_recovery_progress(self):
1907 """
1908 Return whether there is recovery progress discernable in the
1909 raw cluster status
1910 """
1911 status = self.raw_cluster_status()
1912 kps = status['pgmap'].get('recovering_keys_per_sec', 0)
1913 bps = status['pgmap'].get('recovering_bytes_per_sec', 0)
1914 ops = status['pgmap'].get('recovering_objects_per_sec', 0)
1915 return kps > 0 or bps > 0 or ops > 0
1916
1917 def get_num_active(self):
1918 """
1919 Find the number of active pgs.
1920 """
1921 pgs = self.get_pg_stats()
1922 num = 0
1923 for pg in pgs:
1924 if pg['state'].count('active') and not pg['state'].count('stale'):
1925 num += 1
1926 return num
1927
1928 def get_num_down(self):
1929 """
1930 Find the number of pgs that are down.
1931 """
1932 pgs = self.get_pg_stats()
1933 num = 0
1934 for pg in pgs:
1935 if ((pg['state'].count('down') and not
1936 pg['state'].count('stale')) or
1937 (pg['state'].count('incomplete') and not
1938 pg['state'].count('stale'))):
1939 num += 1
1940 return num
1941
1942 def get_num_active_down(self):
1943 """
1944 Find the number of pgs that are either active or down.
1945 """
1946 pgs = self.get_pg_stats()
1947 num = 0
1948 for pg in pgs:
1949 if ((pg['state'].count('active') and not
1950 pg['state'].count('stale')) or
1951 (pg['state'].count('down') and not
1952 pg['state'].count('stale')) or
1953 (pg['state'].count('incomplete') and not
1954 pg['state'].count('stale'))):
1955 num += 1
1956 return num
1957
1958 def is_clean(self):
1959 """
1960 True if all pgs are clean
1961 """
1962 return self.get_num_active_clean() == self.get_num_pgs()
1963
1964 def is_recovered(self):
1965 """
1966 True if all pgs have recovered
1967 """
1968 return self.get_num_active_recovered() == self.get_num_pgs()
1969
1970 def is_active_or_down(self):
1971 """
1972 True if all pgs are active or down
1973 """
1974 return self.get_num_active_down() == self.get_num_pgs()
1975
1976 def wait_for_clean(self, timeout=None):
1977 """
1978 Returns true when all pgs are clean.
1979 """
1980 self.log("waiting for clean")
1981 start = time.time()
1982 num_active_clean = self.get_num_active_clean()
1983 while not self.is_clean():
1984 if timeout is not None:
1985 if self.get_is_making_recovery_progress():
1986 self.log("making progress, resetting timeout")
1987 start = time.time()
1988 else:
1989 self.log("no progress seen, keeping timeout for now")
1990 if time.time() - start >= timeout:
1991 self.log('dumping pgs')
1992 out = self.raw_cluster_cmd('pg', 'dump')
1993 self.log(out)
1994 assert time.time() - start < timeout, \
1995 'failed to become clean before timeout expired'
1996 cur_active_clean = self.get_num_active_clean()
1997 if cur_active_clean != num_active_clean:
1998 start = time.time()
1999 num_active_clean = cur_active_clean
2000 time.sleep(3)
2001 self.log("clean!")
2002
2003 def are_all_osds_up(self):
2004 """
2005 Returns true if all osds are up.
2006 """
2007 x = self.get_osd_dump()
2008 return (len(x) == sum([(y['up'] > 0) for y in x]))
2009
2010 def wait_for_all_up(self, timeout=None):
2011 """
2012 When this exits, either the timeout has expired, or all
2013 osds are up.
2014 """
2015 self.log("waiting for all up")
2016 start = time.time()
2017 while not self.are_all_osds_up():
2018 if timeout is not None:
2019 assert time.time() - start < timeout, \
2020 'timeout expired in wait_for_all_up'
2021 time.sleep(3)
2022 self.log("all up!")
2023
2024 def wait_for_recovery(self, timeout=None):
2025 """
2026 Check peering. When this exists, we have recovered.
2027 """
2028 self.log("waiting for recovery to complete")
2029 start = time.time()
2030 num_active_recovered = self.get_num_active_recovered()
2031 while not self.is_recovered():
2032 now = time.time()
2033 if timeout is not None:
2034 if self.get_is_making_recovery_progress():
2035 self.log("making progress, resetting timeout")
2036 start = time.time()
2037 else:
2038 self.log("no progress seen, keeping timeout for now")
2039 if now - start >= timeout:
2040 self.log('dumping pgs')
2041 out = self.raw_cluster_cmd('pg', 'dump')
2042 self.log(out)
2043 assert now - start < timeout, \
2044 'failed to recover before timeout expired'
2045 cur_active_recovered = self.get_num_active_recovered()
2046 if cur_active_recovered != num_active_recovered:
2047 start = time.time()
2048 num_active_recovered = cur_active_recovered
2049 time.sleep(3)
2050 self.log("recovered!")
2051
2052 def wait_for_active(self, timeout=None):
2053 """
2054 Check peering. When this exists, we are definitely active
2055 """
2056 self.log("waiting for peering to complete")
2057 start = time.time()
2058 num_active = self.get_num_active()
2059 while not self.is_active():
2060 if timeout is not None:
2061 if time.time() - start >= timeout:
2062 self.log('dumping pgs')
2063 out = self.raw_cluster_cmd('pg', 'dump')
2064 self.log(out)
2065 assert time.time() - start < timeout, \
2066 'failed to recover before timeout expired'
2067 cur_active = self.get_num_active()
2068 if cur_active != num_active:
2069 start = time.time()
2070 num_active = cur_active
2071 time.sleep(3)
2072 self.log("active!")
2073
2074 def wait_for_active_or_down(self, timeout=None):
2075 """
2076 Check peering. When this exists, we are definitely either
2077 active or down
2078 """
2079 self.log("waiting for peering to complete or become blocked")
2080 start = time.time()
2081 num_active_down = self.get_num_active_down()
2082 while not self.is_active_or_down():
2083 if timeout is not None:
2084 if time.time() - start >= timeout:
2085 self.log('dumping pgs')
2086 out = self.raw_cluster_cmd('pg', 'dump')
2087 self.log(out)
2088 assert time.time() - start < timeout, \
2089 'failed to recover before timeout expired'
2090 cur_active_down = self.get_num_active_down()
2091 if cur_active_down != num_active_down:
2092 start = time.time()
2093 num_active_down = cur_active_down
2094 time.sleep(3)
2095 self.log("active or down!")
2096
2097 def osd_is_up(self, osd):
2098 """
2099 Wrapper for osd check
2100 """
2101 osds = self.get_osd_dump()
2102 return osds[osd]['up'] > 0
2103
2104 def wait_till_osd_is_up(self, osd, timeout=None):
2105 """
2106 Loop waiting for osd.
2107 """
2108 self.log('waiting for osd.%d to be up' % osd)
2109 start = time.time()
2110 while not self.osd_is_up(osd):
2111 if timeout is not None:
2112 assert time.time() - start < timeout, \
2113 'osd.%d failed to come up before timeout expired' % osd
2114 time.sleep(3)
2115 self.log('osd.%d is up' % osd)
2116
2117 def is_active(self):
2118 """
2119 Wrapper to check if all pgs are active
2120 """
2121 return self.get_num_active() == self.get_num_pgs()
2122
2123 def wait_till_active(self, timeout=None):
2124 """
2125 Wait until all pgs are active.
2126 """
2127 self.log("waiting till active")
2128 start = time.time()
2129 while not self.is_active():
2130 if timeout is not None:
2131 if time.time() - start >= timeout:
2132 self.log('dumping pgs')
2133 out = self.raw_cluster_cmd('pg', 'dump')
2134 self.log(out)
2135 assert time.time() - start < timeout, \
2136 'failed to become active before timeout expired'
2137 time.sleep(3)
2138 self.log("active!")
2139
2140 def mark_out_osd(self, osd):
2141 """
2142 Wrapper to mark osd out.
2143 """
2144 self.raw_cluster_cmd('osd', 'out', str(osd))
2145
2146 def kill_osd(self, osd):
2147 """
2148 Kill osds by either power cycling (if indicated by the config)
2149 or by stopping.
2150 """
2151 if self.config.get('powercycle'):
2152 remote = self.find_remote('osd', osd)
2153 self.log('kill_osd on osd.{o} '
2154 'doing powercycle of {s}'.format(o=osd, s=remote.name))
2155 self._assert_ipmi(remote)
2156 remote.console.power_off()
2157 elif self.config.get('bdev_inject_crash') and self.config.get('bdev_inject_crash_probability'):
2158 if random.uniform(0, 1) < self.config.get('bdev_inject_crash_probability', .5):
2159 self.raw_cluster_cmd(
2160 '--', 'tell', 'osd.%d' % osd,
2161 'injectargs',
2162 '--bdev-inject-crash %d' % self.config.get('bdev_inject_crash'),
2163 )
2164 try:
2165 self.ctx.daemons.get_daemon('osd', osd, self.cluster).wait()
2166 except:
2167 pass
2168 else:
2169 raise RuntimeError('osd.%s did not fail' % osd)
2170 else:
2171 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2172 else:
2173 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2174
2175 @staticmethod
2176 def _assert_ipmi(remote):
2177 assert remote.console.has_ipmi_credentials, (
2178 "powercycling requested but RemoteConsole is not "
2179 "initialized. Check ipmi config.")
2180
2181 def blackhole_kill_osd(self, osd):
2182 """
2183 Stop osd if nothing else works.
2184 """
2185 self.raw_cluster_cmd('--', 'tell', 'osd.%d' % osd,
2186 'injectargs',
2187 '--objectstore-blackhole')
2188 time.sleep(2)
2189 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2190
2191 def revive_osd(self, osd, timeout=150, skip_admin_check=False):
2192 """
2193 Revive osds by either power cycling (if indicated by the config)
2194 or by restarting.
2195 """
2196 if self.config.get('powercycle'):
2197 remote = self.find_remote('osd', osd)
2198 self.log('kill_osd on osd.{o} doing powercycle of {s}'.
2199 format(o=osd, s=remote.name))
2200 self._assert_ipmi(remote)
2201 remote.console.power_on()
2202 if not remote.console.check_status(300):
2203 raise Exception('Failed to revive osd.{o} via ipmi'.
2204 format(o=osd))
2205 teuthology.reconnect(self.ctx, 60, [remote])
2206 mount_osd_data(self.ctx, remote, self.cluster, str(osd))
2207 self.make_admin_daemon_dir(remote)
2208 self.ctx.daemons.get_daemon('osd', osd, self.cluster).reset()
2209 self.ctx.daemons.get_daemon('osd', osd, self.cluster).restart()
2210
2211 if not skip_admin_check:
2212 # wait for dump_ops_in_flight; this command doesn't appear
2213 # until after the signal handler is installed and it is safe
2214 # to stop the osd again without making valgrind leak checks
2215 # unhappy. see #5924.
2216 self.wait_run_admin_socket('osd', osd,
2217 args=['dump_ops_in_flight'],
2218 timeout=timeout, stdout=DEVNULL)
2219
2220 def mark_down_osd(self, osd):
2221 """
2222 Cluster command wrapper
2223 """
2224 self.raw_cluster_cmd('osd', 'down', str(osd))
2225
2226 def mark_in_osd(self, osd):
2227 """
2228 Cluster command wrapper
2229 """
2230 self.raw_cluster_cmd('osd', 'in', str(osd))
2231
2232 def signal_osd(self, osd, sig, silent=False):
2233 """
2234 Wrapper to local get_daemon call which sends the given
2235 signal to the given osd.
2236 """
2237 self.ctx.daemons.get_daemon('osd', osd,
2238 self.cluster).signal(sig, silent=silent)
2239
2240 ## monitors
2241 def signal_mon(self, mon, sig, silent=False):
2242 """
2243 Wrapper to local get_deamon call
2244 """
2245 self.ctx.daemons.get_daemon('mon', mon,
2246 self.cluster).signal(sig, silent=silent)
2247
2248 def kill_mon(self, mon):
2249 """
2250 Kill the monitor by either power cycling (if the config says so),
2251 or by doing a stop.
2252 """
2253 if self.config.get('powercycle'):
2254 remote = self.find_remote('mon', mon)
2255 self.log('kill_mon on mon.{m} doing powercycle of {s}'.
2256 format(m=mon, s=remote.name))
2257 self._assert_ipmi(remote)
2258 remote.console.power_off()
2259 else:
2260 self.ctx.daemons.get_daemon('mon', mon, self.cluster).stop()
2261
2262 def revive_mon(self, mon):
2263 """
2264 Restart by either power cycling (if the config says so),
2265 or by doing a normal restart.
2266 """
2267 if self.config.get('powercycle'):
2268 remote = self.find_remote('mon', mon)
2269 self.log('revive_mon on mon.{m} doing powercycle of {s}'.
2270 format(m=mon, s=remote.name))
2271 self._assert_ipmi(remote)
2272 remote.console.power_on()
2273 self.make_admin_daemon_dir(remote)
2274 self.ctx.daemons.get_daemon('mon', mon, self.cluster).restart()
2275
2276 def get_mon_status(self, mon):
2277 """
2278 Extract all the monitor status information from the cluster
2279 """
2280 addr = self.ctx.ceph[self.cluster].conf['mon.%s' % mon]['mon addr']
2281 out = self.raw_cluster_cmd('-m', addr, 'mon_status')
2282 return json.loads(out)
2283
2284 def get_mon_quorum(self):
2285 """
2286 Extract monitor quorum information from the cluster
2287 """
2288 out = self.raw_cluster_cmd('quorum_status')
2289 j = json.loads(out)
2290 self.log('quorum_status is %s' % out)
2291 return j['quorum']
2292
2293 def wait_for_mon_quorum_size(self, size, timeout=300):
2294 """
2295 Loop until quorum size is reached.
2296 """
2297 self.log('waiting for quorum size %d' % size)
2298 start = time.time()
2299 while not len(self.get_mon_quorum()) == size:
2300 if timeout is not None:
2301 assert time.time() - start < timeout, \
2302 ('failed to reach quorum size %d '
2303 'before timeout expired' % size)
2304 time.sleep(3)
2305 self.log("quorum is size %d" % size)
2306
2307 def get_mon_health(self, debug=False):
2308 """
2309 Extract all the monitor health information.
2310 """
2311 out = self.raw_cluster_cmd('health', '--format=json')
2312 if debug:
2313 self.log('health:\n{h}'.format(h=out))
2314 return json.loads(out)
2315
2316 def get_mds_status(self, mds):
2317 """
2318 Run cluster commands for the mds in order to get mds information
2319 """
2320 out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
2321 j = json.loads(' '.join(out.splitlines()[1:]))
2322 # collate; for dup ids, larger gid wins.
2323 for info in j['info'].itervalues():
2324 if info['name'] == mds:
2325 return info
2326 return None
2327
2328 def get_filepath(self):
2329 """
2330 Return path to osd data with {id} needing to be replaced
2331 """
2332 return '/var/lib/ceph/osd/' + self.cluster + '-{id}'
2333
2334 def make_admin_daemon_dir(self, remote):
2335 """
2336 Create /var/run/ceph directory on remote site.
2337
2338 :param ctx: Context
2339 :param remote: Remote site
2340 """
2341 remote.run(args=['sudo',
2342 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
2343
2344
2345def utility_task(name):
2346 """
2347 Generate ceph_manager subtask corresponding to ceph_manager
2348 method name
2349 """
2350 def task(ctx, config):
2351 if config is None:
2352 config = {}
2353 args = config.get('args', [])
2354 kwargs = config.get('kwargs', {})
2355 cluster = config.get('cluster', 'ceph')
2356 fn = getattr(ctx.managers[cluster], name)
2357 fn(*args, **kwargs)
2358 return task
2359
2360revive_osd = utility_task("revive_osd")
2361revive_mon = utility_task("revive_mon")
2362kill_osd = utility_task("kill_osd")
2363kill_mon = utility_task("kill_mon")
2364create_pool = utility_task("create_pool")
2365remove_pool = utility_task("remove_pool")
2366wait_for_clean = utility_task("wait_for_clean")
2367set_pool_property = utility_task("set_pool_property")
2368do_pg_scrub = utility_task("do_pg_scrub")