]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/ceph_manager.py
bump version to 12.2.2-pve1
[ceph.git] / ceph / qa / tasks / ceph_manager.py
CommitLineData
7c673cae
FG
1"""
2ceph manager -- Thrasher and CephManager objects
3"""
4from cStringIO import StringIO
5from functools import wraps
6import contextlib
7import random
8import signal
9import time
10import gevent
11import base64
12import json
13import logging
14import threading
15import traceback
16import os
17from teuthology import misc as teuthology
18from tasks.scrub import Scrubber
19from util.rados import cmd_erasure_code_profile
20from util import get_remote
21from teuthology.contextutil import safe_while
22from teuthology.orchestra.remote import Remote
23from teuthology.orchestra import run
24from teuthology.exceptions import CommandFailedError
25
26try:
27 from subprocess import DEVNULL # py3k
28except ImportError:
29 DEVNULL = open(os.devnull, 'r+')
30
31DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
32
33log = logging.getLogger(__name__)
34
35
36def write_conf(ctx, conf_path=DEFAULT_CONF_PATH, cluster='ceph'):
37 conf_fp = StringIO()
38 ctx.ceph[cluster].conf.write(conf_fp)
39 conf_fp.seek(0)
40 writes = ctx.cluster.run(
41 args=[
42 'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
43 'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
44 'sudo', 'python',
45 '-c',
46 ('import shutil, sys; '
47 'shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))'),
48 conf_path,
49 run.Raw('&&'),
50 'sudo', 'chmod', '0644', conf_path,
51 ],
52 stdin=run.PIPE,
53 wait=False)
54 teuthology.feed_many_stdins_and_close(conf_fp, writes)
55 run.wait(writes)
56
57
58def mount_osd_data(ctx, remote, cluster, osd):
59 """
60 Mount a remote OSD
61
62 :param ctx: Context
63 :param remote: Remote site
64 :param cluster: name of ceph cluster
65 :param osd: Osd name
66 """
67 log.debug('Mounting data for osd.{o} on {r}'.format(o=osd, r=remote))
68 role = "{0}.osd.{1}".format(cluster, osd)
69 alt_role = role if cluster != 'ceph' else "osd.{0}".format(osd)
70 if remote in ctx.disk_config.remote_to_roles_to_dev:
71 if alt_role in ctx.disk_config.remote_to_roles_to_dev[remote]:
72 role = alt_role
73 if role not in ctx.disk_config.remote_to_roles_to_dev[remote]:
74 return
75 dev = ctx.disk_config.remote_to_roles_to_dev[remote][role]
76 mount_options = ctx.disk_config.\
77 remote_to_roles_to_dev_mount_options[remote][role]
78 fstype = ctx.disk_config.remote_to_roles_to_dev_fstype[remote][role]
79 mnt = os.path.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster, osd))
80
81 log.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
82 'mountpoint: {p}, type: {t}, options: {v}'.format(
83 o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options,
84 c=cluster))
85
86 remote.run(
87 args=[
88 'sudo',
89 'mount',
90 '-t', fstype,
91 '-o', ','.join(mount_options),
92 dev,
93 mnt,
94 ]
95 )
96
97
98class Thrasher:
99 """
100 Object used to thrash Ceph
101 """
102 def __init__(self, manager, config, logger=None):
103 self.ceph_manager = manager
104 self.cluster = manager.cluster
105 self.ceph_manager.wait_for_clean()
106 osd_status = self.ceph_manager.get_osd_status()
107 self.in_osds = osd_status['in']
108 self.live_osds = osd_status['live']
109 self.out_osds = osd_status['out']
110 self.dead_osds = osd_status['dead']
111 self.stopping = False
112 self.logger = logger
113 self.config = config
3efd9988 114 self.revive_timeout = self.config.get("revive_timeout", 360)
7c673cae
FG
115 self.pools_to_fix_pgp_num = set()
116 if self.config.get('powercycle'):
117 self.revive_timeout += 120
118 self.clean_wait = self.config.get('clean_wait', 0)
3efd9988 119 self.minin = self.config.get("min_in", 4)
7c673cae
FG
120 self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
121 self.sighup_delay = self.config.get('sighup_delay')
122 self.optrack_toggle_delay = self.config.get('optrack_toggle_delay')
123 self.dump_ops_enable = self.config.get('dump_ops_enable')
124 self.noscrub_toggle_delay = self.config.get('noscrub_toggle_delay')
125 self.chance_thrash_cluster_full = self.config.get('chance_thrash_cluster_full', .05)
126 self.chance_thrash_pg_upmap = self.config.get('chance_thrash_pg_upmap', 1.0)
127 self.chance_thrash_pg_upmap_items = self.config.get('chance_thrash_pg_upmap', 1.0)
224ce89b 128 self.random_eio = self.config.get('random_eio')
c07f9fc5 129 self.chance_force_recovery = self.config.get('chance_force_recovery', 0.3)
7c673cae
FG
130
131 num_osds = self.in_osds + self.out_osds
132 self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * num_osds
133 if self.logger is not None:
134 self.log = lambda x: self.logger.info(x)
135 else:
136 def tmp(x):
137 """
138 Implement log behavior
139 """
140 print x
141 self.log = tmp
142 if self.config is None:
143 self.config = dict()
144 # prevent monitor from auto-marking things out while thrasher runs
145 # try both old and new tell syntax, in case we are testing old code
146 self.saved_options = []
147 # assuming that the default settings do not vary from one daemon to
148 # another
149 first_mon = teuthology.get_first_mon(manager.ctx, self.config).split('.')
150 opts = [('mon', 'mon_osd_down_out_interval', 0)]
151 for service, opt, new_value in opts:
152 old_value = manager.get_config(first_mon[0],
153 first_mon[1],
154 opt)
155 self.saved_options.append((service, opt, old_value))
156 self._set_config(service, '*', opt, new_value)
157 # initialize ceph_objectstore_tool property - must be done before
158 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
159 if (self.config.get('powercycle') or
160 not self.cmd_exists_on_osds("ceph-objectstore-tool") or
161 self.config.get('disable_objectstore_tool_tests', False)):
162 self.ceph_objectstore_tool = False
163 self.test_rm_past_intervals = False
164 if self.config.get('powercycle'):
165 self.log("Unable to test ceph-objectstore-tool, "
166 "powercycle testing")
167 else:
168 self.log("Unable to test ceph-objectstore-tool, "
169 "not available on all OSD nodes")
170 else:
171 self.ceph_objectstore_tool = \
172 self.config.get('ceph_objectstore_tool', True)
173 self.test_rm_past_intervals = \
174 self.config.get('test_rm_past_intervals', True)
175 # spawn do_thrash
176 self.thread = gevent.spawn(self.do_thrash)
177 if self.sighup_delay:
178 self.sighup_thread = gevent.spawn(self.do_sighup)
179 if self.optrack_toggle_delay:
180 self.optrack_toggle_thread = gevent.spawn(self.do_optrack_toggle)
181 if self.dump_ops_enable == "true":
182 self.dump_ops_thread = gevent.spawn(self.do_dump_ops)
183 if self.noscrub_toggle_delay:
184 self.noscrub_toggle_thread = gevent.spawn(self.do_noscrub_toggle)
185
186 def _set_config(self, service_type, service_id, name, value):
187 opt_arg = '--{name} {value}'.format(name=name, value=value)
31f18b77
FG
188 whom = '.'.join([service_type, service_id])
189 self.ceph_manager.raw_cluster_cmd('--', 'tell', whom,
190 'injectargs', opt_arg)
7c673cae
FG
191
192
193 def cmd_exists_on_osds(self, cmd):
194 allremotes = self.ceph_manager.ctx.cluster.only(\
195 teuthology.is_type('osd', self.cluster)).remotes.keys()
196 allremotes = list(set(allremotes))
197 for remote in allremotes:
198 proc = remote.run(args=['type', cmd], wait=True,
199 check_status=False, stdout=StringIO(),
200 stderr=StringIO())
201 if proc.exitstatus != 0:
202 return False;
203 return True;
204
205 def kill_osd(self, osd=None, mark_down=False, mark_out=False):
206 """
207 :param osd: Osd to be killed.
208 :mark_down: Mark down if true.
209 :mark_out: Mark out if true.
210 """
211 if osd is None:
212 osd = random.choice(self.live_osds)
213 self.log("Killing osd %s, live_osds are %s" % (str(osd),
214 str(self.live_osds)))
215 self.live_osds.remove(osd)
216 self.dead_osds.append(osd)
217 self.ceph_manager.kill_osd(osd)
218 if mark_down:
219 self.ceph_manager.mark_down_osd(osd)
220 if mark_out and osd in self.in_osds:
221 self.out_osd(osd)
222 if self.ceph_objectstore_tool:
223 self.log("Testing ceph-objectstore-tool on down osd")
224 remote = self.ceph_manager.find_remote('osd', osd)
225 FSPATH = self.ceph_manager.get_filepath()
226 JPATH = os.path.join(FSPATH, "journal")
227 exp_osd = imp_osd = osd
228 exp_remote = imp_remote = remote
229 # If an older osd is available we'll move a pg from there
230 if (len(self.dead_osds) > 1 and
231 random.random() < self.chance_move_pg):
232 exp_osd = random.choice(self.dead_osds[:-1])
233 exp_remote = self.ceph_manager.find_remote('osd', exp_osd)
234 if ('keyvaluestore_backend' in
235 self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
236 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
237 "--data-path {fpath} --journal-path {jpath} "
238 "--type keyvaluestore "
239 "--log-file="
240 "/var/log/ceph/objectstore_tool.\\$pid.log ".
241 format(fpath=FSPATH, jpath=JPATH))
242 else:
243 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
244 "--data-path {fpath} --journal-path {jpath} "
245 "--log-file="
246 "/var/log/ceph/objectstore_tool.\\$pid.log ".
247 format(fpath=FSPATH, jpath=JPATH))
248 cmd = (prefix + "--op list-pgs").format(id=exp_osd)
249
250 # ceph-objectstore-tool might be temporarily absent during an
251 # upgrade - see http://tracker.ceph.com/issues/18014
252 with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
253 while proceed():
254 proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'],
255 wait=True, check_status=False, stdout=StringIO(),
256 stderr=StringIO())
257 if proc.exitstatus == 0:
258 break
259 log.debug("ceph-objectstore-tool binary not present, trying again")
260
261 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
262 # see http://tracker.ceph.com/issues/19556
263 with safe_while(sleep=15, tries=40, action="ceph-objectstore-tool --op list-pgs") as proceed:
264 while proceed():
265 proc = exp_remote.run(args=cmd, wait=True,
266 check_status=False,
267 stdout=StringIO(), stderr=StringIO())
268 if proc.exitstatus == 0:
269 break
270 elif proc.exitstatus == 1 and proc.stderr == "OSD has the store locked":
271 continue
272 else:
273 raise Exception("ceph-objectstore-tool: "
274 "exp list-pgs failure with status {ret}".
275 format(ret=proc.exitstatus))
276
277 pgs = proc.stdout.getvalue().split('\n')[:-1]
278 if len(pgs) == 0:
279 self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
280 return
281 pg = random.choice(pgs)
282 exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
283 exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
284 exp_path = os.path.join(exp_path,
285 "exp.{pg}.{id}".format(
286 pg=pg,
287 id=exp_osd))
288 # export
3efd9988 289 # Can't use new export-remove op since this is part of upgrade testing
7c673cae
FG
290 cmd = prefix + "--op export --pgid {pg} --file {file}"
291 cmd = cmd.format(id=exp_osd, pg=pg, file=exp_path)
292 proc = exp_remote.run(args=cmd)
293 if proc.exitstatus:
294 raise Exception("ceph-objectstore-tool: "
295 "export failure with status {ret}".
296 format(ret=proc.exitstatus))
297 # remove
3efd9988 298 cmd = prefix + "--force --op remove --pgid {pg}"
7c673cae
FG
299 cmd = cmd.format(id=exp_osd, pg=pg)
300 proc = exp_remote.run(args=cmd)
301 if proc.exitstatus:
302 raise Exception("ceph-objectstore-tool: "
303 "remove failure with status {ret}".
304 format(ret=proc.exitstatus))
305 # If there are at least 2 dead osds we might move the pg
306 if exp_osd != imp_osd:
307 # If pg isn't already on this osd, then we will move it there
308 cmd = (prefix + "--op list-pgs").format(id=imp_osd)
309 proc = imp_remote.run(args=cmd, wait=True,
310 check_status=False, stdout=StringIO())
311 if proc.exitstatus:
312 raise Exception("ceph-objectstore-tool: "
313 "imp list-pgs failure with status {ret}".
314 format(ret=proc.exitstatus))
315 pgs = proc.stdout.getvalue().split('\n')[:-1]
316 if pg not in pgs:
317 self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
318 format(pg=pg, fosd=exp_osd, tosd=imp_osd))
319 if imp_remote != exp_remote:
320 # Copy export file to the other machine
321 self.log("Transfer export file from {srem} to {trem}".
322 format(srem=exp_remote, trem=imp_remote))
323 tmpexport = Remote.get_file(exp_remote, exp_path)
324 Remote.put_file(imp_remote, tmpexport, exp_path)
325 os.remove(tmpexport)
326 else:
327 # Can't move the pg after all
328 imp_osd = exp_osd
329 imp_remote = exp_remote
330 # import
331 cmd = (prefix + "--op import --file {file}")
332 cmd = cmd.format(id=imp_osd, file=exp_path)
333 proc = imp_remote.run(args=cmd, wait=True, check_status=False,
334 stderr=StringIO())
335 if proc.exitstatus == 1:
336 bogosity = "The OSD you are using is older than the exported PG"
337 if bogosity in proc.stderr.getvalue():
338 self.log("OSD older than exported PG"
339 "...ignored")
340 elif proc.exitstatus == 10:
341 self.log("Pool went away before processing an import"
342 "...ignored")
343 elif proc.exitstatus == 11:
344 self.log("Attempt to import an incompatible export"
345 "...ignored")
346 elif proc.exitstatus:
347 raise Exception("ceph-objectstore-tool: "
348 "import failure with status {ret}".
349 format(ret=proc.exitstatus))
350 cmd = "rm -f {file}".format(file=exp_path)
351 exp_remote.run(args=cmd)
352 if imp_remote != exp_remote:
353 imp_remote.run(args=cmd)
354
355 # apply low split settings to each pool
356 for pool in self.ceph_manager.list_pools():
357 no_sudo_prefix = prefix[5:]
358 cmd = ("CEPH_ARGS='--filestore-merge-threshold 1 "
359 "--filestore-split-multiple 1' sudo -E "
360 + no_sudo_prefix + "--op apply-layout-settings --pool " + pool).format(id=osd)
361 proc = remote.run(args=cmd, wait=True, check_status=False, stderr=StringIO())
362 output = proc.stderr.getvalue()
363 if 'Couldn\'t find pool' in output:
364 continue
365 if proc.exitstatus:
366 raise Exception("ceph-objectstore-tool apply-layout-settings"
367 " failed with {status}".format(status=proc.exitstatus))
368
369 def rm_past_intervals(self, osd=None):
370 """
371 :param osd: Osd to find pg to remove past intervals
372 """
373 if self.test_rm_past_intervals:
374 if osd is None:
375 osd = random.choice(self.dead_osds)
376 self.log("Use ceph_objectstore_tool to remove past intervals")
377 remote = self.ceph_manager.find_remote('osd', osd)
378 FSPATH = self.ceph_manager.get_filepath()
379 JPATH = os.path.join(FSPATH, "journal")
380 if ('keyvaluestore_backend' in
381 self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
382 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
383 "--data-path {fpath} --journal-path {jpath} "
384 "--type keyvaluestore "
385 "--log-file="
386 "/var/log/ceph/objectstore_tool.\\$pid.log ".
387 format(fpath=FSPATH, jpath=JPATH))
388 else:
389 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
390 "--data-path {fpath} --journal-path {jpath} "
391 "--log-file="
392 "/var/log/ceph/objectstore_tool.\\$pid.log ".
393 format(fpath=FSPATH, jpath=JPATH))
394 cmd = (prefix + "--op list-pgs").format(id=osd)
395 proc = remote.run(args=cmd, wait=True,
396 check_status=False, stdout=StringIO())
397 if proc.exitstatus:
398 raise Exception("ceph_objectstore_tool: "
399 "exp list-pgs failure with status {ret}".
400 format(ret=proc.exitstatus))
401 pgs = proc.stdout.getvalue().split('\n')[:-1]
402 if len(pgs) == 0:
403 self.log("No PGs found for osd.{osd}".format(osd=osd))
404 return
405 pg = random.choice(pgs)
406 cmd = (prefix + "--op rm-past-intervals --pgid {pg}").\
407 format(id=osd, pg=pg)
408 proc = remote.run(args=cmd)
409 if proc.exitstatus:
410 raise Exception("ceph_objectstore_tool: "
411 "rm-past-intervals failure with status {ret}".
412 format(ret=proc.exitstatus))
413
414 def blackhole_kill_osd(self, osd=None):
415 """
416 If all else fails, kill the osd.
417 :param osd: Osd to be killed.
418 """
419 if osd is None:
420 osd = random.choice(self.live_osds)
421 self.log("Blackholing and then killing osd %s, live_osds are %s" %
422 (str(osd), str(self.live_osds)))
423 self.live_osds.remove(osd)
424 self.dead_osds.append(osd)
425 self.ceph_manager.blackhole_kill_osd(osd)
426
427 def revive_osd(self, osd=None, skip_admin_check=False):
428 """
429 Revive the osd.
430 :param osd: Osd to be revived.
431 """
432 if osd is None:
433 osd = random.choice(self.dead_osds)
434 self.log("Reviving osd %s" % (str(osd),))
435 self.ceph_manager.revive_osd(
436 osd,
437 self.revive_timeout,
438 skip_admin_check=skip_admin_check)
439 self.dead_osds.remove(osd)
440 self.live_osds.append(osd)
224ce89b
WB
441 if self.random_eio > 0 and osd is self.rerrosd:
442 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
443 'injectargs', '--', '--filestore_debug_random_read_err='+str(self.random_eio))
444 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
445 'injectargs', '--', '--bluestore_debug_random_read_err='+str(self.random_eio))
446
7c673cae
FG
447
448 def out_osd(self, osd=None):
449 """
450 Mark the osd out
451 :param osd: Osd to be marked.
452 """
453 if osd is None:
454 osd = random.choice(self.in_osds)
455 self.log("Removing osd %s, in_osds are: %s" %
456 (str(osd), str(self.in_osds)))
457 self.ceph_manager.mark_out_osd(osd)
458 self.in_osds.remove(osd)
459 self.out_osds.append(osd)
460
461 def in_osd(self, osd=None):
462 """
463 Mark the osd out
464 :param osd: Osd to be marked.
465 """
466 if osd is None:
467 osd = random.choice(self.out_osds)
468 if osd in self.dead_osds:
469 return self.revive_osd(osd)
470 self.log("Adding osd %s" % (str(osd),))
471 self.out_osds.remove(osd)
472 self.in_osds.append(osd)
473 self.ceph_manager.mark_in_osd(osd)
474 self.log("Added osd %s" % (str(osd),))
475
476 def reweight_osd_or_by_util(self, osd=None):
477 """
478 Reweight an osd that is in
479 :param osd: Osd to be marked.
480 """
481 if osd is not None or random.choice([True, False]):
482 if osd is None:
483 osd = random.choice(self.in_osds)
484 val = random.uniform(.1, 1.0)
485 self.log("Reweighting osd %s to %s" % (str(osd), str(val)))
486 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
487 str(osd), str(val))
488 else:
489 # do it several times, the option space is large
490 for i in range(5):
491 options = {
492 'max_change': random.choice(['0.05', '1.0', '3.0']),
493 'overage': random.choice(['110', '1000']),
494 'type': random.choice([
495 'reweight-by-utilization',
496 'test-reweight-by-utilization']),
497 }
498 self.log("Reweighting by: %s"%(str(options),))
499 self.ceph_manager.raw_cluster_cmd(
500 'osd',
501 options['type'],
502 options['overage'],
503 options['max_change'])
504
505 def primary_affinity(self, osd=None):
506 if osd is None:
507 osd = random.choice(self.in_osds)
508 if random.random() >= .5:
509 pa = random.random()
510 elif random.random() >= .5:
511 pa = 1
512 else:
513 pa = 0
514 self.log('Setting osd %s primary_affinity to %f' % (str(osd), pa))
515 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
516 str(osd), str(pa))
517
518 def thrash_cluster_full(self):
519 """
520 Set and unset cluster full condition
521 """
522 self.log('Setting full ratio to .001')
523 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
524 time.sleep(1)
525 self.log('Setting full ratio back to .95')
526 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
527
528 def thrash_pg_upmap(self):
529 """
530 Install or remove random pg_upmap entries in OSDMap
531 """
532 from random import shuffle
533 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
534 j = json.loads(out)
535 self.log('j is %s' % j)
536 try:
537 if random.random() >= .3:
538 pgs = self.ceph_manager.get_pg_stats()
539 pg = random.choice(pgs)
540 pgid = str(pg['pgid'])
541 poolid = int(pgid.split('.')[0])
542 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
543 if len(sizes) == 0:
544 return
545 n = sizes[0]
546 osds = self.in_osds + self.out_osds
547 shuffle(osds)
548 osds = osds[0:n]
549 self.log('Setting %s to %s' % (pgid, osds))
550 cmd = ['osd', 'pg-upmap', pgid] + [str(x) for x in osds]
551 self.log('cmd %s' % cmd)
552 self.ceph_manager.raw_cluster_cmd(*cmd)
553 else:
554 m = j['pg_upmap']
555 if len(m) > 0:
556 shuffle(m)
557 pg = m[0]['pgid']
558 self.log('Clearing pg_upmap on %s' % pg)
559 self.ceph_manager.raw_cluster_cmd(
560 'osd',
561 'rm-pg-upmap',
562 pg)
563 else:
564 self.log('No pg_upmap entries; doing nothing')
565 except CommandFailedError:
566 self.log('Failed to rm-pg-upmap, ignoring')
567
568 def thrash_pg_upmap_items(self):
569 """
570 Install or remove random pg_upmap_items entries in OSDMap
571 """
572 from random import shuffle
573 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
574 j = json.loads(out)
575 self.log('j is %s' % j)
576 try:
577 if random.random() >= .3:
578 pgs = self.ceph_manager.get_pg_stats()
579 pg = random.choice(pgs)
580 pgid = str(pg['pgid'])
581 poolid = int(pgid.split('.')[0])
582 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
583 if len(sizes) == 0:
584 return
585 n = sizes[0]
586 osds = self.in_osds + self.out_osds
587 shuffle(osds)
588 osds = osds[0:n*2]
589 self.log('Setting %s to %s' % (pgid, osds))
590 cmd = ['osd', 'pg-upmap-items', pgid] + [str(x) for x in osds]
591 self.log('cmd %s' % cmd)
592 self.ceph_manager.raw_cluster_cmd(*cmd)
593 else:
594 m = j['pg_upmap_items']
595 if len(m) > 0:
596 shuffle(m)
597 pg = m[0]['pgid']
598 self.log('Clearing pg_upmap on %s' % pg)
599 self.ceph_manager.raw_cluster_cmd(
600 'osd',
601 'rm-pg-upmap-items',
602 pg)
603 else:
604 self.log('No pg_upmap entries; doing nothing')
605 except CommandFailedError:
606 self.log('Failed to rm-pg-upmap-items, ignoring')
607
c07f9fc5
FG
608 def force_recovery(self):
609 """
610 Force recovery on some of PGs
611 """
612 backfill = random.random() >= 0.5
613 j = self.ceph_manager.get_pgids_to_force(backfill)
614 if j:
615 if backfill:
616 self.ceph_manager.raw_cluster_cmd('pg', 'force-backfill', *j)
617 else:
618 self.ceph_manager.raw_cluster_cmd('pg', 'force-recovery', *j)
619
620 def cancel_force_recovery(self):
621 """
622 Force recovery on some of PGs
623 """
624 backfill = random.random() >= 0.5
625 j = self.ceph_manager.get_pgids_to_cancel_force(backfill)
626 if j:
627 if backfill:
628 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-backfill', *j)
629 else:
630 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-recovery', *j)
631
632 def force_cancel_recovery(self):
633 """
634 Force or cancel forcing recovery
635 """
636 if random.random() >= 0.4:
637 self.force_recovery()
638 else:
639 self.cancel_force_recovery()
640
7c673cae
FG
641 def all_up(self):
642 """
643 Make sure all osds are up and not out.
644 """
645 while len(self.dead_osds) > 0:
646 self.log("reviving osd")
647 self.revive_osd()
648 while len(self.out_osds) > 0:
649 self.log("inning osd")
650 self.in_osd()
651
31f18b77
FG
652 def all_up_in(self):
653 """
654 Make sure all osds are up and fully in.
655 """
656 self.all_up();
657 for osd in self.live_osds:
658 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
659 str(osd), str(1))
660 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
661 str(osd), str(1))
662
7c673cae
FG
663 def do_join(self):
664 """
665 Break out of this Ceph loop
666 """
667 self.stopping = True
668 self.thread.get()
669 if self.sighup_delay:
670 self.log("joining the do_sighup greenlet")
671 self.sighup_thread.get()
672 if self.optrack_toggle_delay:
673 self.log("joining the do_optrack_toggle greenlet")
674 self.optrack_toggle_thread.join()
675 if self.dump_ops_enable == "true":
676 self.log("joining the do_dump_ops greenlet")
677 self.dump_ops_thread.join()
678 if self.noscrub_toggle_delay:
679 self.log("joining the do_noscrub_toggle greenlet")
680 self.noscrub_toggle_thread.join()
681
682 def grow_pool(self):
683 """
684 Increase the size of the pool
685 """
686 pool = self.ceph_manager.get_pool()
687 orig_pg_num = self.ceph_manager.get_pool_pg_num(pool)
688 self.log("Growing pool %s" % (pool,))
689 if self.ceph_manager.expand_pool(pool,
690 self.config.get('pool_grow_by', 10),
691 self.max_pgs):
692 self.pools_to_fix_pgp_num.add(pool)
693
694 def fix_pgp_num(self, pool=None):
695 """
696 Fix number of pgs in pool.
697 """
698 if pool is None:
699 pool = self.ceph_manager.get_pool()
700 force = False
701 else:
702 force = True
703 self.log("fixing pg num pool %s" % (pool,))
704 if self.ceph_manager.set_pool_pgpnum(pool, force):
705 self.pools_to_fix_pgp_num.discard(pool)
706
707 def test_pool_min_size(self):
708 """
709 Kill and revive all osds except one.
710 """
711 self.log("test_pool_min_size")
712 self.all_up()
713 self.ceph_manager.wait_for_recovery(
714 timeout=self.config.get('timeout')
715 )
716 the_one = random.choice(self.in_osds)
717 self.log("Killing everyone but %s", the_one)
718 to_kill = filter(lambda x: x != the_one, self.in_osds)
719 [self.kill_osd(i) for i in to_kill]
720 [self.out_osd(i) for i in to_kill]
721 time.sleep(self.config.get("test_pool_min_size_time", 10))
722 self.log("Killing %s" % (the_one,))
723 self.kill_osd(the_one)
724 self.out_osd(the_one)
725 self.log("Reviving everyone but %s" % (the_one,))
726 [self.revive_osd(i) for i in to_kill]
727 [self.in_osd(i) for i in to_kill]
728 self.log("Revived everyone but %s" % (the_one,))
729 self.log("Waiting for clean")
730 self.ceph_manager.wait_for_recovery(
731 timeout=self.config.get('timeout')
732 )
733
734 def inject_pause(self, conf_key, duration, check_after, should_be_down):
735 """
736 Pause injection testing. Check for osd being down when finished.
737 """
738 the_one = random.choice(self.live_osds)
739 self.log("inject_pause on {osd}".format(osd=the_one))
740 self.log(
741 "Testing {key} pause injection for duration {duration}".format(
742 key=conf_key,
743 duration=duration
744 ))
745 self.log(
746 "Checking after {after}, should_be_down={shouldbedown}".format(
747 after=check_after,
748 shouldbedown=should_be_down
749 ))
750 self.ceph_manager.set_config(the_one, **{conf_key: duration})
751 if not should_be_down:
752 return
753 time.sleep(check_after)
754 status = self.ceph_manager.get_osd_status()
755 assert the_one in status['down']
756 time.sleep(duration - check_after + 20)
757 status = self.ceph_manager.get_osd_status()
758 assert not the_one in status['down']
759
760 def test_backfill_full(self):
761 """
762 Test backfills stopping when the replica fills up.
763
764 First, use injectfull admin command to simulate a now full
765 osd by setting it to 0 on all of the OSDs.
766
767 Second, on a random subset, set
768 osd_debug_skip_full_check_in_backfill_reservation to force
769 the more complicated check in do_scan to be exercised.
770
3efd9988 771 Then, verify that all backfillings stop.
7c673cae
FG
772 """
773 self.log("injecting backfill full")
774 for i in self.live_osds:
775 self.ceph_manager.set_config(
776 i,
777 osd_debug_skip_full_check_in_backfill_reservation=
778 random.choice(['false', 'true']))
779 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'backfillfull'],
780 check_status=True, timeout=30, stdout=DEVNULL)
781 for i in range(30):
782 status = self.ceph_manager.compile_pg_status()
3efd9988 783 if 'backfilling' not in status.keys():
7c673cae
FG
784 break
785 self.log(
3efd9988
FG
786 "waiting for {still_going} backfillings".format(
787 still_going=status.get('backfilling')))
7c673cae 788 time.sleep(1)
3efd9988 789 assert('backfilling' not in self.ceph_manager.compile_pg_status().keys())
7c673cae
FG
790 for i in self.live_osds:
791 self.ceph_manager.set_config(
792 i,
793 osd_debug_skip_full_check_in_backfill_reservation='false')
794 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'none'],
795 check_status=True, timeout=30, stdout=DEVNULL)
796
797 def test_map_discontinuity(self):
798 """
799 1) Allows the osds to recover
800 2) kills an osd
801 3) allows the remaining osds to recover
802 4) waits for some time
803 5) revives the osd
804 This sequence should cause the revived osd to have to handle
805 a map gap since the mons would have trimmed
806 """
807 while len(self.in_osds) < (self.minin + 1):
808 self.in_osd()
809 self.log("Waiting for recovery")
c07f9fc5 810 self.ceph_manager.wait_for_all_osds_up(
7c673cae
FG
811 timeout=self.config.get('timeout')
812 )
813 # now we wait 20s for the pg status to change, if it takes longer,
814 # the test *should* fail!
815 time.sleep(20)
816 self.ceph_manager.wait_for_clean(
817 timeout=self.config.get('timeout')
818 )
819
820 # now we wait 20s for the backfill replicas to hear about the clean
821 time.sleep(20)
822 self.log("Recovered, killing an osd")
823 self.kill_osd(mark_down=True, mark_out=True)
824 self.log("Waiting for clean again")
825 self.ceph_manager.wait_for_clean(
826 timeout=self.config.get('timeout')
827 )
828 self.log("Waiting for trim")
829 time.sleep(int(self.config.get("map_discontinuity_sleep_time", 40)))
830 self.revive_osd()
831
832 def choose_action(self):
833 """
834 Random action selector.
835 """
836 chance_down = self.config.get('chance_down', 0.4)
837 chance_test_min_size = self.config.get('chance_test_min_size', 0)
838 chance_test_backfill_full = \
839 self.config.get('chance_test_backfill_full', 0)
840 if isinstance(chance_down, int):
841 chance_down = float(chance_down) / 100
842 minin = self.minin
843 minout = self.config.get("min_out", 0)
844 minlive = self.config.get("min_live", 2)
845 mindead = self.config.get("min_dead", 0)
846
847 self.log('choose_action: min_in %d min_out '
848 '%d min_live %d min_dead %d' %
849 (minin, minout, minlive, mindead))
850 actions = []
851 if len(self.in_osds) > minin:
852 actions.append((self.out_osd, 1.0,))
853 if len(self.live_osds) > minlive and chance_down > 0:
854 actions.append((self.kill_osd, chance_down,))
855 if len(self.dead_osds) > 1:
856 actions.append((self.rm_past_intervals, 1.0,))
857 if len(self.out_osds) > minout:
858 actions.append((self.in_osd, 1.7,))
859 if len(self.dead_osds) > mindead:
860 actions.append((self.revive_osd, 1.0,))
861 if self.config.get('thrash_primary_affinity', True):
862 actions.append((self.primary_affinity, 1.0,))
863 actions.append((self.reweight_osd_or_by_util,
864 self.config.get('reweight_osd', .5),))
865 actions.append((self.grow_pool,
866 self.config.get('chance_pgnum_grow', 0),))
867 actions.append((self.fix_pgp_num,
868 self.config.get('chance_pgpnum_fix', 0),))
869 actions.append((self.test_pool_min_size,
870 chance_test_min_size,))
871 actions.append((self.test_backfill_full,
872 chance_test_backfill_full,))
873 if self.chance_thrash_cluster_full > 0:
874 actions.append((self.thrash_cluster_full, self.chance_thrash_cluster_full,))
875 if self.chance_thrash_pg_upmap > 0:
876 actions.append((self.thrash_pg_upmap, self.chance_thrash_pg_upmap,))
877 if self.chance_thrash_pg_upmap_items > 0:
878 actions.append((self.thrash_pg_upmap_items, self.chance_thrash_pg_upmap_items,))
c07f9fc5
FG
879 if self.chance_force_recovery > 0:
880 actions.append((self.force_cancel_recovery, self.chance_force_recovery))
7c673cae
FG
881
882 for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
883 for scenario in [
884 (lambda:
885 self.inject_pause(key,
886 self.config.get('pause_short', 3),
887 0,
888 False),
889 self.config.get('chance_inject_pause_short', 1),),
890 (lambda:
891 self.inject_pause(key,
892 self.config.get('pause_long', 80),
893 self.config.get('pause_check_after', 70),
894 True),
895 self.config.get('chance_inject_pause_long', 0),)]:
896 actions.append(scenario)
897
898 total = sum([y for (x, y) in actions])
899 val = random.uniform(0, total)
900 for (action, prob) in actions:
901 if val < prob:
902 return action
903 val -= prob
904 return None
905
906 def log_exc(func):
907 @wraps(func)
908 def wrapper(self):
909 try:
910 return func(self)
911 except:
912 self.log(traceback.format_exc())
913 raise
914 return wrapper
915
916 @log_exc
917 def do_sighup(self):
918 """
919 Loops and sends signal.SIGHUP to a random live osd.
920
921 Loop delay is controlled by the config value sighup_delay.
922 """
923 delay = float(self.sighup_delay)
924 self.log("starting do_sighup with a delay of {0}".format(delay))
925 while not self.stopping:
926 osd = random.choice(self.live_osds)
927 self.ceph_manager.signal_osd(osd, signal.SIGHUP, silent=True)
928 time.sleep(delay)
929
930 @log_exc
931 def do_optrack_toggle(self):
932 """
933 Loops and toggle op tracking to all osds.
934
935 Loop delay is controlled by the config value optrack_toggle_delay.
936 """
937 delay = float(self.optrack_toggle_delay)
938 osd_state = "true"
939 self.log("starting do_optrack_toggle with a delay of {0}".format(delay))
940 while not self.stopping:
941 if osd_state == "true":
942 osd_state = "false"
943 else:
944 osd_state = "true"
945 self.ceph_manager.raw_cluster_cmd_result('tell', 'osd.*',
946 'injectargs', '--osd_enable_op_tracker=%s' % osd_state)
947 gevent.sleep(delay)
948
949 @log_exc
950 def do_dump_ops(self):
951 """
952 Loops and does op dumps on all osds
953 """
954 self.log("starting do_dump_ops")
955 while not self.stopping:
956 for osd in self.live_osds:
957 # Ignore errors because live_osds is in flux
958 self.ceph_manager.osd_admin_socket(osd, command=['dump_ops_in_flight'],
959 check_status=False, timeout=30, stdout=DEVNULL)
960 self.ceph_manager.osd_admin_socket(osd, command=['dump_blocked_ops'],
961 check_status=False, timeout=30, stdout=DEVNULL)
962 self.ceph_manager.osd_admin_socket(osd, command=['dump_historic_ops'],
963 check_status=False, timeout=30, stdout=DEVNULL)
964 gevent.sleep(0)
965
966 @log_exc
967 def do_noscrub_toggle(self):
968 """
969 Loops and toggle noscrub flags
970
971 Loop delay is controlled by the config value noscrub_toggle_delay.
972 """
973 delay = float(self.noscrub_toggle_delay)
974 scrub_state = "none"
975 self.log("starting do_noscrub_toggle with a delay of {0}".format(delay))
976 while not self.stopping:
977 if scrub_state == "none":
978 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'noscrub')
979 scrub_state = "noscrub"
980 elif scrub_state == "noscrub":
981 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
982 scrub_state = "both"
983 elif scrub_state == "both":
984 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
985 scrub_state = "nodeep-scrub"
986 else:
987 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
988 scrub_state = "none"
989 gevent.sleep(delay)
990 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
991 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
992
993 @log_exc
994 def do_thrash(self):
995 """
996 Loop to select random actions to thrash ceph manager with.
997 """
998 cleanint = self.config.get("clean_interval", 60)
999 scrubint = self.config.get("scrub_interval", -1)
1000 maxdead = self.config.get("max_dead", 0)
1001 delay = self.config.get("op_delay", 5)
224ce89b
WB
1002 self.rerrosd = self.live_osds[0]
1003 if self.random_eio > 0:
1004 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1005 'injectargs', '--', '--filestore_debug_random_read_err='+str(self.random_eio))
1006 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1007 'injectargs', '--', '--bluestore_debug_random_read_err='+str(self.random_eio))
7c673cae
FG
1008 self.log("starting do_thrash")
1009 while not self.stopping:
1010 to_log = [str(x) for x in ["in_osds: ", self.in_osds,
1011 "out_osds: ", self.out_osds,
1012 "dead_osds: ", self.dead_osds,
1013 "live_osds: ", self.live_osds]]
1014 self.log(" ".join(to_log))
1015 if random.uniform(0, 1) < (float(delay) / cleanint):
1016 while len(self.dead_osds) > maxdead:
1017 self.revive_osd()
1018 for osd in self.in_osds:
1019 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
1020 str(osd), str(1))
1021 if random.uniform(0, 1) < float(
1022 self.config.get('chance_test_map_discontinuity', 0)):
1023 self.test_map_discontinuity()
1024 else:
1025 self.ceph_manager.wait_for_recovery(
1026 timeout=self.config.get('timeout')
1027 )
1028 time.sleep(self.clean_wait)
1029 if scrubint > 0:
1030 if random.uniform(0, 1) < (float(delay) / scrubint):
1031 self.log('Scrubbing while thrashing being performed')
1032 Scrubber(self.ceph_manager, self.config)
1033 self.choose_action()()
1034 time.sleep(delay)
181888fb 1035 self.all_up()
224ce89b
WB
1036 if self.random_eio > 0:
1037 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1038 'injectargs', '--', '--filestore_debug_random_read_err=0.0')
1039 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1040 'injectargs', '--', '--bluestore_debug_random_read_err=0.0')
7c673cae
FG
1041 for pool in list(self.pools_to_fix_pgp_num):
1042 if self.ceph_manager.get_pool_pg_num(pool) > 0:
1043 self.fix_pgp_num(pool)
1044 self.pools_to_fix_pgp_num.clear()
1045 for service, opt, saved_value in self.saved_options:
1046 self._set_config(service, '*', opt, saved_value)
1047 self.saved_options = []
31f18b77 1048 self.all_up_in()
7c673cae
FG
1049
1050
1051class ObjectStoreTool:
1052
1053 def __init__(self, manager, pool, **kwargs):
1054 self.manager = manager
1055 self.pool = pool
1056 self.osd = kwargs.get('osd', None)
1057 self.object_name = kwargs.get('object_name', None)
1058 self.do_revive = kwargs.get('do_revive', True)
1059 if self.osd and self.pool and self.object_name:
1060 if self.osd == "primary":
1061 self.osd = self.manager.get_object_primary(self.pool,
1062 self.object_name)
1063 assert self.osd
1064 if self.object_name:
1065 self.pgid = self.manager.get_object_pg_with_shard(self.pool,
1066 self.object_name,
1067 self.osd)
1068 self.remote = self.manager.ctx.\
1069 cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()[0]
1070 path = self.manager.get_filepath().format(id=self.osd)
1071 self.paths = ("--data-path {path} --journal-path {path}/journal".
1072 format(path=path))
1073
1074 def build_cmd(self, options, args, stdin):
1075 lines = []
1076 if self.object_name:
1077 lines.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1078 "{paths} --pgid {pgid} --op list |"
1079 "grep '\"oid\":\"{name}\"')".
1080 format(paths=self.paths,
1081 pgid=self.pgid,
1082 name=self.object_name))
1083 args = '"$object" ' + args
1084 options += " --pgid {pgid}".format(pgid=self.pgid)
1085 cmd = ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1086 format(paths=self.paths,
1087 args=args,
1088 options=options))
1089 if stdin:
1090 cmd = ("echo {payload} | base64 --decode | {cmd}".
1091 format(payload=base64.encode(stdin),
1092 cmd=cmd))
1093 lines.append(cmd)
1094 return "\n".join(lines)
1095
1096 def run(self, options, args, stdin=None, stdout=None):
1097 if stdout is None:
1098 stdout = StringIO()
1099 self.manager.kill_osd(self.osd)
1100 cmd = self.build_cmd(options, args, stdin)
1101 self.manager.log(cmd)
1102 try:
1103 proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
1104 check_status=False,
1105 stdout=stdout,
1106 stderr=StringIO())
1107 proc.wait()
1108 if proc.exitstatus != 0:
1109 self.manager.log("failed with " + str(proc.exitstatus))
1110 error = proc.stdout.getvalue() + " " + proc.stderr.getvalue()
1111 raise Exception(error)
1112 finally:
1113 if self.do_revive:
1114 self.manager.revive_osd(self.osd)
c07f9fc5 1115 self.manager.wait_till_osd_is_up(self.osd, 300)
7c673cae
FG
1116
1117
1118class CephManager:
1119 """
1120 Ceph manager object.
1121 Contains several local functions that form a bulk of this module.
1122
1123 Note: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1124 the same name.
1125 """
1126
1127 REPLICATED_POOL = 1
1128 ERASURE_CODED_POOL = 3
1129
1130 def __init__(self, controller, ctx=None, config=None, logger=None,
1131 cluster='ceph'):
1132 self.lock = threading.RLock()
1133 self.ctx = ctx
1134 self.config = config
1135 self.controller = controller
1136 self.next_pool_id = 0
1137 self.cluster = cluster
1138 if (logger):
1139 self.log = lambda x: logger.info(x)
1140 else:
1141 def tmp(x):
1142 """
1143 implement log behavior.
1144 """
1145 print x
1146 self.log = tmp
1147 if self.config is None:
1148 self.config = dict()
1149 pools = self.list_pools()
1150 self.pools = {}
1151 for pool in pools:
1152 # we may race with a pool deletion; ignore failures here
1153 try:
1154 self.pools[pool] = self.get_pool_property(pool, 'pg_num')
1155 except CommandFailedError:
1156 self.log('Failed to get pg_num from pool %s, ignoring' % pool)
1157
1158 def raw_cluster_cmd(self, *args):
1159 """
1160 Start ceph on a raw cluster. Return count
1161 """
1162 testdir = teuthology.get_testdir(self.ctx)
1163 ceph_args = [
1164 'sudo',
1165 'adjust-ulimits',
1166 'ceph-coverage',
1167 '{tdir}/archive/coverage'.format(tdir=testdir),
1168 'timeout',
1169 '120',
1170 'ceph',
1171 '--cluster',
1172 self.cluster,
1173 ]
1174 ceph_args.extend(args)
1175 proc = self.controller.run(
1176 args=ceph_args,
1177 stdout=StringIO(),
1178 )
1179 return proc.stdout.getvalue()
1180
1181 def raw_cluster_cmd_result(self, *args):
1182 """
1183 Start ceph on a cluster. Return success or failure information.
1184 """
1185 testdir = teuthology.get_testdir(self.ctx)
1186 ceph_args = [
1187 'sudo',
1188 'adjust-ulimits',
1189 'ceph-coverage',
1190 '{tdir}/archive/coverage'.format(tdir=testdir),
1191 'timeout',
1192 '120',
1193 'ceph',
1194 '--cluster',
1195 self.cluster,
1196 ]
1197 ceph_args.extend(args)
1198 proc = self.controller.run(
1199 args=ceph_args,
1200 check_status=False,
1201 )
1202 return proc.exitstatus
1203
1204 def run_ceph_w(self):
1205 """
1206 Execute "ceph -w" in the background with stdout connected to a StringIO,
1207 and return the RemoteProcess.
1208 """
1209 return self.controller.run(
1210 args=["sudo",
1211 "daemon-helper",
1212 "kill",
1213 "ceph",
1214 '--cluster',
1215 self.cluster,
1216 "-w"],
1217 wait=False, stdout=StringIO(), stdin=run.PIPE)
1218
224ce89b 1219 def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
31f18b77
FG
1220 """
1221 Flush pg stats from a list of OSD ids, ensuring they are reflected
1222 all the way to the monitor. Luminous and later only.
1223
1224 :param osds: list of OSDs to flush
1225 :param no_wait: list of OSDs not to wait for seq id. by default, we
1226 wait for all specified osds, but some of them could be
1227 moved out of osdmap, so we cannot get their updated
1228 stat seq from monitor anymore. in that case, you need
1229 to pass a blacklist.
1230 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
224ce89b 1231 it. (5 min by default)
31f18b77
FG
1232 """
1233 seq = {osd: self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats')
1234 for osd in osds}
1235 if not wait_for_mon:
1236 return
1237 if no_wait is None:
1238 no_wait = []
1239 for osd, need in seq.iteritems():
1240 if osd in no_wait:
1241 continue
1242 got = 0
1243 while wait_for_mon > 0:
1244 got = self.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd)
1245 self.log('need seq {need} got {got} for osd.{osd}'.format(
1246 need=need, got=got, osd=osd))
1247 if got >= need:
1248 break
1249 A_WHILE = 1
1250 time.sleep(A_WHILE)
1251 wait_for_mon -= A_WHILE
1252 else:
1253 raise Exception('timed out waiting for mon to be updated with '
1254 'osd.{osd}: {got} < {need}'.
1255 format(osd=osd, got=got, need=need))
1256
1257 def flush_all_pg_stats(self):
1258 self.flush_pg_stats(range(len(self.get_osd_dump())))
1259
7c673cae
FG
1260 def do_rados(self, remote, cmd, check_status=True):
1261 """
1262 Execute a remote rados command.
1263 """
1264 testdir = teuthology.get_testdir(self.ctx)
1265 pre = [
1266 'adjust-ulimits',
1267 'ceph-coverage',
1268 '{tdir}/archive/coverage'.format(tdir=testdir),
1269 'rados',
1270 '--cluster',
1271 self.cluster,
1272 ]
1273 pre.extend(cmd)
1274 proc = remote.run(
1275 args=pre,
1276 wait=True,
1277 check_status=check_status
1278 )
1279 return proc
1280
1281 def rados_write_objects(self, pool, num_objects, size,
1282 timelimit, threads, cleanup=False):
1283 """
1284 Write rados objects
1285 Threads not used yet.
1286 """
1287 args = [
1288 '-p', pool,
1289 '--num-objects', num_objects,
1290 '-b', size,
1291 'bench', timelimit,
1292 'write'
1293 ]
1294 if not cleanup:
1295 args.append('--no-cleanup')
1296 return self.do_rados(self.controller, map(str, args))
1297
1298 def do_put(self, pool, obj, fname, namespace=None):
1299 """
1300 Implement rados put operation
1301 """
1302 args = ['-p', pool]
1303 if namespace is not None:
1304 args += ['-N', namespace]
1305 args += [
1306 'put',
1307 obj,
1308 fname
1309 ]
1310 return self.do_rados(
1311 self.controller,
1312 args,
1313 check_status=False
1314 ).exitstatus
1315
1316 def do_get(self, pool, obj, fname='/dev/null', namespace=None):
1317 """
1318 Implement rados get operation
1319 """
1320 args = ['-p', pool]
1321 if namespace is not None:
1322 args += ['-N', namespace]
1323 args += [
1324 'get',
1325 obj,
1326 fname
1327 ]
1328 return self.do_rados(
1329 self.controller,
1330 args,
1331 check_status=False
1332 ).exitstatus
1333
1334 def do_rm(self, pool, obj, namespace=None):
1335 """
1336 Implement rados rm operation
1337 """
1338 args = ['-p', pool]
1339 if namespace is not None:
1340 args += ['-N', namespace]
1341 args += [
1342 'rm',
1343 obj
1344 ]
1345 return self.do_rados(
1346 self.controller,
1347 args,
1348 check_status=False
1349 ).exitstatus
1350
1351 def osd_admin_socket(self, osd_id, command, check_status=True, timeout=0, stdout=None):
1352 if stdout is None:
1353 stdout = StringIO()
1354 return self.admin_socket('osd', osd_id, command, check_status, timeout, stdout)
1355
1356 def find_remote(self, service_type, service_id):
1357 """
1358 Get the Remote for the host where a particular service runs.
1359
1360 :param service_type: 'mds', 'osd', 'client'
1361 :param service_id: The second part of a role, e.g. '0' for
1362 the role 'client.0'
1363 :return: a Remote instance for the host where the
1364 requested role is placed
1365 """
1366 return get_remote(self.ctx, self.cluster,
1367 service_type, service_id)
1368
1369 def admin_socket(self, service_type, service_id,
1370 command, check_status=True, timeout=0, stdout=None):
1371 """
1372 Remotely start up ceph specifying the admin socket
1373 :param command: a list of words to use as the command
1374 to the admin socket
1375 """
1376 if stdout is None:
1377 stdout = StringIO()
1378 testdir = teuthology.get_testdir(self.ctx)
1379 remote = self.find_remote(service_type, service_id)
1380 args = [
1381 'sudo',
1382 'adjust-ulimits',
1383 'ceph-coverage',
1384 '{tdir}/archive/coverage'.format(tdir=testdir),
1385 'timeout',
1386 str(timeout),
1387 'ceph',
1388 '--cluster',
1389 self.cluster,
1390 '--admin-daemon',
1391 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1392 cluster=self.cluster,
1393 type=service_type,
1394 id=service_id),
1395 ]
1396 args.extend(command)
1397 return remote.run(
1398 args=args,
1399 stdout=stdout,
1400 wait=True,
1401 check_status=check_status
1402 )
1403
1404 def objectstore_tool(self, pool, options, args, **kwargs):
1405 return ObjectStoreTool(self, pool, **kwargs).run(options, args)
1406
1407 def get_pgid(self, pool, pgnum):
1408 """
1409 :param pool: pool name
1410 :param pgnum: pg number
1411 :returns: a string representing this pg.
1412 """
1413 poolnum = self.get_pool_num(pool)
1414 pg_str = "{poolnum}.{pgnum}".format(
1415 poolnum=poolnum,
1416 pgnum=pgnum)
1417 return pg_str
1418
1419 def get_pg_replica(self, pool, pgnum):
1420 """
1421 get replica for pool, pgnum (e.g. (data, 0)->0
1422 """
1423 pg_str = self.get_pgid(pool, pgnum)
1424 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1425 j = json.loads('\n'.join(output.split('\n')[1:]))
1426 return int(j['acting'][-1])
1427 assert False
1428
1429 def wait_for_pg_stats(func):
1430 # both osd_mon_report_interval_min and mgr_stats_period are 5 seconds
1431 # by default, and take the faulty injection in ms into consideration,
1432 # 12 seconds are more than enough
1433 delays = [1, 1, 2, 3, 5, 8, 13]
1434 @wraps(func)
1435 def wrapper(self, *args, **kwargs):
1436 exc = None
1437 for delay in delays:
1438 try:
1439 return func(self, *args, **kwargs)
1440 except AssertionError as e:
1441 time.sleep(delay)
1442 exc = e
1443 raise exc
1444 return wrapper
1445
1446 def get_pg_primary(self, pool, pgnum):
1447 """
1448 get primary for pool, pgnum (e.g. (data, 0)->0
1449 """
1450 pg_str = self.get_pgid(pool, pgnum)
1451 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1452 j = json.loads('\n'.join(output.split('\n')[1:]))
1453 return int(j['acting'][0])
1454 assert False
1455
1456 def get_pool_num(self, pool):
1457 """
1458 get number for pool (e.g., data -> 2)
1459 """
1460 return int(self.get_pool_dump(pool)['pool'])
1461
1462 def list_pools(self):
1463 """
1464 list all pool names
1465 """
1466 osd_dump = self.get_osd_dump_json()
1467 self.log(osd_dump['pools'])
1468 return [str(i['pool_name']) for i in osd_dump['pools']]
1469
1470 def clear_pools(self):
1471 """
1472 remove all pools
1473 """
1474 [self.remove_pool(i) for i in self.list_pools()]
1475
1476 def kick_recovery_wq(self, osdnum):
1477 """
1478 Run kick_recovery_wq on cluster.
1479 """
1480 return self.raw_cluster_cmd(
1481 'tell', "osd.%d" % (int(osdnum),),
1482 'debug',
1483 'kick_recovery_wq',
1484 '0')
1485
1486 def wait_run_admin_socket(self, service_type,
1487 service_id, args=['version'], timeout=75, stdout=None):
1488 """
1489 If osd_admin_socket call suceeds, return. Otherwise wait
1490 five seconds and try again.
1491 """
1492 if stdout is None:
1493 stdout = StringIO()
1494 tries = 0
1495 while True:
1496 proc = self.admin_socket(service_type, service_id,
1497 args, check_status=False, stdout=stdout)
1498 if proc.exitstatus is 0:
1499 return proc
1500 else:
1501 tries += 1
1502 if (tries * 5) > timeout:
1503 raise Exception('timed out waiting for admin_socket '
1504 'to appear after {type}.{id} restart'.
1505 format(type=service_type,
1506 id=service_id))
1507 self.log("waiting on admin_socket for {type}-{id}, "
1508 "{command}".format(type=service_type,
1509 id=service_id,
1510 command=args))
1511 time.sleep(5)
1512
1513 def get_pool_dump(self, pool):
1514 """
1515 get the osd dump part of a pool
1516 """
1517 osd_dump = self.get_osd_dump_json()
1518 for i in osd_dump['pools']:
1519 if i['pool_name'] == pool:
1520 return i
1521 assert False
1522
1523 def get_config(self, service_type, service_id, name):
1524 """
1525 :param node: like 'mon.a'
1526 :param name: the option name
1527 """
1528 proc = self.wait_run_admin_socket(service_type, service_id,
1529 ['config', 'show'])
1530 j = json.loads(proc.stdout.getvalue())
1531 return j[name]
1532
1533 def set_config(self, osdnum, **argdict):
1534 """
1535 :param osdnum: osd number
1536 :param argdict: dictionary containing values to set.
1537 """
1538 for k, v in argdict.iteritems():
1539 self.wait_run_admin_socket(
1540 'osd', osdnum,
1541 ['config', 'set', str(k), str(v)])
1542
1543 def raw_cluster_status(self):
1544 """
1545 Get status from cluster
1546 """
1547 status = self.raw_cluster_cmd('status', '--format=json-pretty')
1548 return json.loads(status)
1549
1550 def raw_osd_status(self):
1551 """
1552 Get osd status from cluster
1553 """
1554 return self.raw_cluster_cmd('osd', 'dump')
1555
1556 def get_osd_status(self):
1557 """
1558 Get osd statuses sorted by states that the osds are in.
1559 """
1560 osd_lines = filter(
1561 lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
1562 self.raw_osd_status().split('\n'))
1563 self.log(osd_lines)
1564 in_osds = [int(i[4:].split()[0])
1565 for i in filter(lambda x: " in " in x, osd_lines)]
1566 out_osds = [int(i[4:].split()[0])
1567 for i in filter(lambda x: " out " in x, osd_lines)]
1568 up_osds = [int(i[4:].split()[0])
1569 for i in filter(lambda x: " up " in x, osd_lines)]
1570 down_osds = [int(i[4:].split()[0])
1571 for i in filter(lambda x: " down " in x, osd_lines)]
1572 dead_osds = [int(x.id_)
1573 for x in filter(lambda x:
1574 not x.running(),
1575 self.ctx.daemons.
1576 iter_daemons_of_role('osd', self.cluster))]
1577 live_osds = [int(x.id_) for x in
1578 filter(lambda x:
1579 x.running(),
1580 self.ctx.daemons.iter_daemons_of_role('osd',
1581 self.cluster))]
1582 return {'in': in_osds, 'out': out_osds, 'up': up_osds,
1583 'down': down_osds, 'dead': dead_osds, 'live': live_osds,
1584 'raw': osd_lines}
1585
1586 def get_num_pgs(self):
1587 """
1588 Check cluster status for the number of pgs
1589 """
1590 status = self.raw_cluster_status()
1591 self.log(status)
1592 return status['pgmap']['num_pgs']
1593
1594 def create_erasure_code_profile(self, profile_name, profile):
1595 """
1596 Create an erasure code profile name that can be used as a parameter
1597 when creating an erasure coded pool.
1598 """
1599 with self.lock:
1600 args = cmd_erasure_code_profile(profile_name, profile)
1601 self.raw_cluster_cmd(*args)
1602
1603 def create_pool_with_unique_name(self, pg_num=16,
1604 erasure_code_profile_name=None,
1605 min_size=None,
1606 erasure_code_use_overwrites=False):
1607 """
1608 Create a pool named unique_pool_X where X is unique.
1609 """
1610 name = ""
1611 with self.lock:
1612 name = "unique_pool_%s" % (str(self.next_pool_id),)
1613 self.next_pool_id += 1
1614 self.create_pool(
1615 name,
1616 pg_num,
1617 erasure_code_profile_name=erasure_code_profile_name,
1618 min_size=min_size,
1619 erasure_code_use_overwrites=erasure_code_use_overwrites)
1620 return name
1621
1622 @contextlib.contextmanager
1623 def pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
1624 self.create_pool(pool_name, pg_num, erasure_code_profile_name)
1625 yield
1626 self.remove_pool(pool_name)
1627
1628 def create_pool(self, pool_name, pg_num=16,
1629 erasure_code_profile_name=None,
1630 min_size=None,
1631 erasure_code_use_overwrites=False):
1632 """
1633 Create a pool named from the pool_name parameter.
1634 :param pool_name: name of the pool being created.
1635 :param pg_num: initial number of pgs.
1636 :param erasure_code_profile_name: if set and !None create an
1637 erasure coded pool using the profile
1638 :param erasure_code_use_overwrites: if true, allow overwrites
1639 """
1640 with self.lock:
1641 assert isinstance(pool_name, basestring)
1642 assert isinstance(pg_num, int)
1643 assert pool_name not in self.pools
1644 self.log("creating pool_name %s" % (pool_name,))
1645 if erasure_code_profile_name:
1646 self.raw_cluster_cmd('osd', 'pool', 'create',
1647 pool_name, str(pg_num), str(pg_num),
1648 'erasure', erasure_code_profile_name)
1649 else:
1650 self.raw_cluster_cmd('osd', 'pool', 'create',
1651 pool_name, str(pg_num))
1652 if min_size is not None:
1653 self.raw_cluster_cmd(
1654 'osd', 'pool', 'set', pool_name,
1655 'min_size',
1656 str(min_size))
1657 if erasure_code_use_overwrites:
1658 self.raw_cluster_cmd(
1659 'osd', 'pool', 'set', pool_name,
1660 'allow_ec_overwrites',
1661 'true')
c07f9fc5
FG
1662 self.raw_cluster_cmd(
1663 'osd', 'pool', 'application', 'enable',
1664 pool_name, 'rados', '--yes-i-really-mean-it',
1665 run.Raw('||'), 'true')
7c673cae
FG
1666 self.pools[pool_name] = pg_num
1667 time.sleep(1)
1668
1669 def add_pool_snap(self, pool_name, snap_name):
1670 """
1671 Add pool snapshot
1672 :param pool_name: name of pool to snapshot
1673 :param snap_name: name of snapshot to take
1674 """
1675 self.raw_cluster_cmd('osd', 'pool', 'mksnap',
1676 str(pool_name), str(snap_name))
1677
1678 def remove_pool_snap(self, pool_name, snap_name):
1679 """
1680 Remove pool snapshot
1681 :param pool_name: name of pool to snapshot
1682 :param snap_name: name of snapshot to remove
1683 """
1684 self.raw_cluster_cmd('osd', 'pool', 'rmsnap',
1685 str(pool_name), str(snap_name))
1686
1687 def remove_pool(self, pool_name):
1688 """
1689 Remove the indicated pool
1690 :param pool_name: Pool to be removed
1691 """
1692 with self.lock:
1693 assert isinstance(pool_name, basestring)
1694 assert pool_name in self.pools
1695 self.log("removing pool_name %s" % (pool_name,))
1696 del self.pools[pool_name]
1697 self.do_rados(self.controller,
1698 ['rmpool', pool_name, pool_name,
1699 "--yes-i-really-really-mean-it"])
1700
1701 def get_pool(self):
1702 """
1703 Pick a random pool
1704 """
1705 with self.lock:
1706 return random.choice(self.pools.keys())
1707
1708 def get_pool_pg_num(self, pool_name):
1709 """
1710 Return the number of pgs in the pool specified.
1711 """
1712 with self.lock:
1713 assert isinstance(pool_name, basestring)
1714 if pool_name in self.pools:
1715 return self.pools[pool_name]
1716 return 0
1717
1718 def get_pool_property(self, pool_name, prop):
1719 """
1720 :param pool_name: pool
1721 :param prop: property to be checked.
1722 :returns: property as an int value.
1723 """
1724 with self.lock:
1725 assert isinstance(pool_name, basestring)
1726 assert isinstance(prop, basestring)
1727 output = self.raw_cluster_cmd(
1728 'osd',
1729 'pool',
1730 'get',
1731 pool_name,
1732 prop)
1733 return int(output.split()[1])
1734
1735 def set_pool_property(self, pool_name, prop, val):
1736 """
1737 :param pool_name: pool
1738 :param prop: property to be set.
1739 :param val: value to set.
1740
1741 This routine retries if set operation fails.
1742 """
1743 with self.lock:
1744 assert isinstance(pool_name, basestring)
1745 assert isinstance(prop, basestring)
1746 assert isinstance(val, int)
1747 tries = 0
1748 while True:
1749 r = self.raw_cluster_cmd_result(
1750 'osd',
1751 'pool',
1752 'set',
1753 pool_name,
1754 prop,
1755 str(val))
1756 if r != 11: # EAGAIN
1757 break
1758 tries += 1
1759 if tries > 50:
1760 raise Exception('timed out getting EAGAIN '
1761 'when setting pool property %s %s = %s' %
1762 (pool_name, prop, val))
1763 self.log('got EAGAIN setting pool property, '
1764 'waiting a few seconds...')
1765 time.sleep(2)
1766
1767 def expand_pool(self, pool_name, by, max_pgs):
1768 """
1769 Increase the number of pgs in a pool
1770 """
1771 with self.lock:
1772 assert isinstance(pool_name, basestring)
1773 assert isinstance(by, int)
1774 assert pool_name in self.pools
1775 if self.get_num_creating() > 0:
1776 return False
1777 if (self.pools[pool_name] + by) > max_pgs:
1778 return False
1779 self.log("increase pool size by %d" % (by,))
1780 new_pg_num = self.pools[pool_name] + by
1781 self.set_pool_property(pool_name, "pg_num", new_pg_num)
1782 self.pools[pool_name] = new_pg_num
1783 return True
1784
1785 def set_pool_pgpnum(self, pool_name, force):
1786 """
1787 Set pgpnum property of pool_name pool.
1788 """
1789 with self.lock:
1790 assert isinstance(pool_name, basestring)
1791 assert pool_name in self.pools
1792 if not force and self.get_num_creating() > 0:
1793 return False
1794 self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
1795 return True
1796
1797 def list_pg_missing(self, pgid):
1798 """
1799 return list of missing pgs with the id specified
1800 """
1801 r = None
1802 offset = {}
1803 while True:
1804 out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_missing',
1805 json.dumps(offset))
1806 j = json.loads(out)
1807 if r is None:
1808 r = j
1809 else:
1810 r['objects'].extend(j['objects'])
1811 if not 'more' in j:
1812 break
1813 if j['more'] == 0:
1814 break
1815 offset = j['objects'][-1]['oid']
1816 if 'more' in r:
1817 del r['more']
1818 return r
1819
1820 def get_pg_stats(self):
1821 """
1822 Dump the cluster and get pg stats
1823 """
1824 out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
1825 j = json.loads('\n'.join(out.split('\n')[1:]))
1826 return j['pg_stats']
1827
c07f9fc5
FG
1828 def get_pgids_to_force(self, backfill):
1829 """
1830 Return the randomized list of PGs that can have their recovery/backfill forced
1831 """
1832 j = self.get_pg_stats();
1833 pgids = []
1834 if backfill:
1835 wanted = ['degraded', 'backfilling', 'backfill_wait']
1836 else:
1837 wanted = ['recovering', 'degraded', 'recovery_wait']
1838 for pg in j:
1839 status = pg['state'].split('+')
1840 for t in wanted:
1841 if random.random() > 0.5 and not ('forced_backfill' in status or 'forced_recovery' in status) and t in status:
1842 pgids.append(pg['pgid'])
1843 break
1844 return pgids
1845
1846 def get_pgids_to_cancel_force(self, backfill):
1847 """
1848 Return the randomized list of PGs whose recovery/backfill priority is forced
1849 """
1850 j = self.get_pg_stats();
1851 pgids = []
1852 if backfill:
1853 wanted = 'forced_backfill'
1854 else:
1855 wanted = 'forced_recovery'
1856 for pg in j:
1857 status = pg['state'].split('+')
1858 if wanted in status and random.random() > 0.5:
1859 pgids.append(pg['pgid'])
1860 return pgids
1861
7c673cae
FG
1862 def compile_pg_status(self):
1863 """
1864 Return a histogram of pg state values
1865 """
1866 ret = {}
1867 j = self.get_pg_stats()
1868 for pg in j:
1869 for status in pg['state'].split('+'):
1870 if status not in ret:
1871 ret[status] = 0
1872 ret[status] += 1
1873 return ret
1874
1875 @wait_for_pg_stats
1876 def with_pg_state(self, pool, pgnum, check):
1877 pgstr = self.get_pgid(pool, pgnum)
1878 stats = self.get_single_pg_stats(pgstr)
1879 assert(check(stats['state']))
1880
1881 @wait_for_pg_stats
1882 def with_pg(self, pool, pgnum, check):
1883 pgstr = self.get_pgid(pool, pgnum)
1884 stats = self.get_single_pg_stats(pgstr)
1885 return check(stats)
1886
1887 def get_last_scrub_stamp(self, pool, pgnum):
1888 """
1889 Get the timestamp of the last scrub.
1890 """
1891 stats = self.get_single_pg_stats(self.get_pgid(pool, pgnum))
1892 return stats["last_scrub_stamp"]
1893
1894 def do_pg_scrub(self, pool, pgnum, stype):
1895 """
1896 Scrub pg and wait for scrubbing to finish
1897 """
1898 init = self.get_last_scrub_stamp(pool, pgnum)
1899 RESEND_TIMEOUT = 120 # Must be a multiple of SLEEP_TIME
1900 FATAL_TIMEOUT = RESEND_TIMEOUT * 3
1901 SLEEP_TIME = 10
1902 timer = 0
1903 while init == self.get_last_scrub_stamp(pool, pgnum):
1904 assert timer < FATAL_TIMEOUT, "fatal timeout trying to " + stype
1905 self.log("waiting for scrub type %s" % (stype,))
1906 if (timer % RESEND_TIMEOUT) == 0:
1907 self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
1908 # The first time in this loop is the actual request
1909 if timer != 0 and stype == "repair":
1910 self.log("WARNING: Resubmitted a non-idempotent repair")
1911 time.sleep(SLEEP_TIME)
1912 timer += SLEEP_TIME
1913
1914 def wait_snap_trimming_complete(self, pool):
1915 """
1916 Wait for snap trimming on pool to end
1917 """
1918 POLL_PERIOD = 10
1919 FATAL_TIMEOUT = 600
1920 start = time.time()
1921 poolnum = self.get_pool_num(pool)
1922 poolnumstr = "%s." % (poolnum,)
1923 while (True):
1924 now = time.time()
1925 if (now - start) > FATAL_TIMEOUT:
1926 assert (now - start) < FATAL_TIMEOUT, \
1927 'failed to complete snap trimming before timeout'
1928 all_stats = self.get_pg_stats()
1929 trimming = False
1930 for pg in all_stats:
1931 if (poolnumstr in pg['pgid']) and ('snaptrim' in pg['state']):
1932 self.log("pg {pg} in trimming, state: {state}".format(
1933 pg=pg['pgid'],
1934 state=pg['state']))
1935 trimming = True
1936 if not trimming:
1937 break
1938 self.log("{pool} still trimming, waiting".format(pool=pool))
1939 time.sleep(POLL_PERIOD)
1940
1941 def get_single_pg_stats(self, pgid):
1942 """
1943 Return pg for the pgid specified.
1944 """
1945 all_stats = self.get_pg_stats()
1946
1947 for pg in all_stats:
1948 if pg['pgid'] == pgid:
1949 return pg
1950
1951 return None
1952
1953 def get_object_pg_with_shard(self, pool, name, osdid):
1954 """
1955 """
1956 pool_dump = self.get_pool_dump(pool)
1957 object_map = self.get_object_map(pool, name)
1958 if pool_dump["type"] == CephManager.ERASURE_CODED_POOL:
1959 shard = object_map['acting'].index(osdid)
1960 return "{pgid}s{shard}".format(pgid=object_map['pgid'],
1961 shard=shard)
1962 else:
1963 return object_map['pgid']
1964
1965 def get_object_primary(self, pool, name):
1966 """
1967 """
1968 object_map = self.get_object_map(pool, name)
1969 return object_map['acting_primary']
1970
1971 def get_object_map(self, pool, name):
1972 """
1973 osd map --format=json converted to a python object
1974 :returns: the python object
1975 """
1976 out = self.raw_cluster_cmd('--format=json', 'osd', 'map', pool, name)
1977 return json.loads('\n'.join(out.split('\n')[1:]))
1978
1979 def get_osd_dump_json(self):
1980 """
1981 osd dump --format=json converted to a python object
1982 :returns: the python object
1983 """
1984 out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
1985 return json.loads('\n'.join(out.split('\n')[1:]))
1986
1987 def get_osd_dump(self):
1988 """
1989 Dump osds
1990 :returns: all osds
1991 """
1992 return self.get_osd_dump_json()['osds']
1993
c07f9fc5
FG
1994 def get_mgr_dump(self):
1995 out = self.raw_cluster_cmd('mgr', 'dump', '--format=json')
1996 return json.loads(out)
1997
7c673cae
FG
1998 def get_stuck_pgs(self, type_, threshold):
1999 """
2000 :returns: stuck pg information from the cluster
2001 """
2002 out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
2003 '--format=json')
2004 return json.loads(out)
2005
2006 def get_num_unfound_objects(self):
2007 """
2008 Check cluster status to get the number of unfound objects
2009 """
2010 status = self.raw_cluster_status()
2011 self.log(status)
2012 return status['pgmap'].get('unfound_objects', 0)
2013
2014 def get_num_creating(self):
2015 """
2016 Find the number of pgs in creating mode.
2017 """
2018 pgs = self.get_pg_stats()
2019 num = 0
2020 for pg in pgs:
2021 if 'creating' in pg['state']:
2022 num += 1
2023 return num
2024
2025 def get_num_active_clean(self):
2026 """
2027 Find the number of active and clean pgs.
2028 """
2029 pgs = self.get_pg_stats()
2030 num = 0
2031 for pg in pgs:
2032 if (pg['state'].count('active') and
2033 pg['state'].count('clean') and
2034 not pg['state'].count('stale')):
2035 num += 1
2036 return num
2037
2038 def get_num_active_recovered(self):
2039 """
2040 Find the number of active and recovered pgs.
2041 """
2042 pgs = self.get_pg_stats()
2043 num = 0
2044 for pg in pgs:
2045 if (pg['state'].count('active') and
2046 not pg['state'].count('recover') and
3efd9988 2047 not pg['state'].count('backfilling') and
7c673cae
FG
2048 not pg['state'].count('stale')):
2049 num += 1
2050 return num
2051
2052 def get_is_making_recovery_progress(self):
2053 """
2054 Return whether there is recovery progress discernable in the
2055 raw cluster status
2056 """
2057 status = self.raw_cluster_status()
2058 kps = status['pgmap'].get('recovering_keys_per_sec', 0)
2059 bps = status['pgmap'].get('recovering_bytes_per_sec', 0)
2060 ops = status['pgmap'].get('recovering_objects_per_sec', 0)
2061 return kps > 0 or bps > 0 or ops > 0
2062
2063 def get_num_active(self):
2064 """
2065 Find the number of active pgs.
2066 """
2067 pgs = self.get_pg_stats()
2068 num = 0
2069 for pg in pgs:
2070 if pg['state'].count('active') and not pg['state'].count('stale'):
2071 num += 1
2072 return num
2073
2074 def get_num_down(self):
2075 """
2076 Find the number of pgs that are down.
2077 """
2078 pgs = self.get_pg_stats()
2079 num = 0
2080 for pg in pgs:
2081 if ((pg['state'].count('down') and not
2082 pg['state'].count('stale')) or
2083 (pg['state'].count('incomplete') and not
2084 pg['state'].count('stale'))):
2085 num += 1
2086 return num
2087
2088 def get_num_active_down(self):
2089 """
2090 Find the number of pgs that are either active or down.
2091 """
2092 pgs = self.get_pg_stats()
2093 num = 0
2094 for pg in pgs:
2095 if ((pg['state'].count('active') and not
2096 pg['state'].count('stale')) or
2097 (pg['state'].count('down') and not
2098 pg['state'].count('stale')) or
2099 (pg['state'].count('incomplete') and not
2100 pg['state'].count('stale'))):
2101 num += 1
2102 return num
2103
2104 def is_clean(self):
2105 """
2106 True if all pgs are clean
2107 """
2108 return self.get_num_active_clean() == self.get_num_pgs()
2109
2110 def is_recovered(self):
2111 """
2112 True if all pgs have recovered
2113 """
2114 return self.get_num_active_recovered() == self.get_num_pgs()
2115
2116 def is_active_or_down(self):
2117 """
2118 True if all pgs are active or down
2119 """
2120 return self.get_num_active_down() == self.get_num_pgs()
2121
2122 def wait_for_clean(self, timeout=None):
2123 """
2124 Returns true when all pgs are clean.
2125 """
2126 self.log("waiting for clean")
2127 start = time.time()
2128 num_active_clean = self.get_num_active_clean()
2129 while not self.is_clean():
2130 if timeout is not None:
2131 if self.get_is_making_recovery_progress():
2132 self.log("making progress, resetting timeout")
2133 start = time.time()
2134 else:
2135 self.log("no progress seen, keeping timeout for now")
2136 if time.time() - start >= timeout:
2137 self.log('dumping pgs')
2138 out = self.raw_cluster_cmd('pg', 'dump')
2139 self.log(out)
2140 assert time.time() - start < timeout, \
2141 'failed to become clean before timeout expired'
2142 cur_active_clean = self.get_num_active_clean()
2143 if cur_active_clean != num_active_clean:
2144 start = time.time()
2145 num_active_clean = cur_active_clean
2146 time.sleep(3)
2147 self.log("clean!")
2148
2149 def are_all_osds_up(self):
2150 """
2151 Returns true if all osds are up.
2152 """
2153 x = self.get_osd_dump()
2154 return (len(x) == sum([(y['up'] > 0) for y in x]))
2155
c07f9fc5 2156 def wait_for_all_osds_up(self, timeout=None):
7c673cae
FG
2157 """
2158 When this exits, either the timeout has expired, or all
2159 osds are up.
2160 """
2161 self.log("waiting for all up")
2162 start = time.time()
2163 while not self.are_all_osds_up():
2164 if timeout is not None:
2165 assert time.time() - start < timeout, \
c07f9fc5 2166 'timeout expired in wait_for_all_osds_up'
7c673cae
FG
2167 time.sleep(3)
2168 self.log("all up!")
2169
c07f9fc5
FG
2170 def pool_exists(self, pool):
2171 if pool in self.list_pools():
2172 return True
2173 return False
2174
2175 def wait_for_pool(self, pool, timeout=300):
2176 """
2177 Wait for a pool to exist
2178 """
2179 self.log('waiting for pool %s to exist' % pool)
2180 start = time.time()
2181 while not self.pool_exists(pool):
2182 if timeout is not None:
2183 assert time.time() - start < timeout, \
2184 'timeout expired in wait_for_pool'
2185 time.sleep(3)
2186
2187 def wait_for_pools(self, pools):
2188 for pool in pools:
2189 self.wait_for_pool(pool)
2190
2191 def is_mgr_available(self):
2192 x = self.get_mgr_dump()
2193 return x.get('available', False)
2194
2195 def wait_for_mgr_available(self, timeout=None):
2196 self.log("waiting for mgr available")
2197 start = time.time()
2198 while not self.is_mgr_available():
2199 if timeout is not None:
2200 assert time.time() - start < timeout, \
2201 'timeout expired in wait_for_mgr_available'
2202 time.sleep(3)
2203 self.log("mgr available!")
2204
7c673cae
FG
2205 def wait_for_recovery(self, timeout=None):
2206 """
2207 Check peering. When this exists, we have recovered.
2208 """
2209 self.log("waiting for recovery to complete")
2210 start = time.time()
2211 num_active_recovered = self.get_num_active_recovered()
2212 while not self.is_recovered():
2213 now = time.time()
2214 if timeout is not None:
2215 if self.get_is_making_recovery_progress():
2216 self.log("making progress, resetting timeout")
2217 start = time.time()
2218 else:
2219 self.log("no progress seen, keeping timeout for now")
2220 if now - start >= timeout:
3efd9988
FG
2221 if self.is_recovered():
2222 break
7c673cae
FG
2223 self.log('dumping pgs')
2224 out = self.raw_cluster_cmd('pg', 'dump')
2225 self.log(out)
2226 assert now - start < timeout, \
2227 'failed to recover before timeout expired'
2228 cur_active_recovered = self.get_num_active_recovered()
2229 if cur_active_recovered != num_active_recovered:
2230 start = time.time()
2231 num_active_recovered = cur_active_recovered
2232 time.sleep(3)
2233 self.log("recovered!")
2234
2235 def wait_for_active(self, timeout=None):
2236 """
2237 Check peering. When this exists, we are definitely active
2238 """
2239 self.log("waiting for peering to complete")
2240 start = time.time()
2241 num_active = self.get_num_active()
2242 while not self.is_active():
2243 if timeout is not None:
2244 if time.time() - start >= timeout:
2245 self.log('dumping pgs')
2246 out = self.raw_cluster_cmd('pg', 'dump')
2247 self.log(out)
2248 assert time.time() - start < timeout, \
2249 'failed to recover before timeout expired'
2250 cur_active = self.get_num_active()
2251 if cur_active != num_active:
2252 start = time.time()
2253 num_active = cur_active
2254 time.sleep(3)
2255 self.log("active!")
2256
2257 def wait_for_active_or_down(self, timeout=None):
2258 """
2259 Check peering. When this exists, we are definitely either
2260 active or down
2261 """
2262 self.log("waiting for peering to complete or become blocked")
2263 start = time.time()
2264 num_active_down = self.get_num_active_down()
2265 while not self.is_active_or_down():
2266 if timeout is not None:
2267 if time.time() - start >= timeout:
2268 self.log('dumping pgs')
2269 out = self.raw_cluster_cmd('pg', 'dump')
2270 self.log(out)
2271 assert time.time() - start < timeout, \
2272 'failed to recover before timeout expired'
2273 cur_active_down = self.get_num_active_down()
2274 if cur_active_down != num_active_down:
2275 start = time.time()
2276 num_active_down = cur_active_down
2277 time.sleep(3)
2278 self.log("active or down!")
2279
2280 def osd_is_up(self, osd):
2281 """
2282 Wrapper for osd check
2283 """
2284 osds = self.get_osd_dump()
2285 return osds[osd]['up'] > 0
2286
2287 def wait_till_osd_is_up(self, osd, timeout=None):
2288 """
2289 Loop waiting for osd.
2290 """
2291 self.log('waiting for osd.%d to be up' % osd)
2292 start = time.time()
2293 while not self.osd_is_up(osd):
2294 if timeout is not None:
2295 assert time.time() - start < timeout, \
2296 'osd.%d failed to come up before timeout expired' % osd
2297 time.sleep(3)
2298 self.log('osd.%d is up' % osd)
2299
2300 def is_active(self):
2301 """
2302 Wrapper to check if all pgs are active
2303 """
2304 return self.get_num_active() == self.get_num_pgs()
2305
2306 def wait_till_active(self, timeout=None):
2307 """
2308 Wait until all pgs are active.
2309 """
2310 self.log("waiting till active")
2311 start = time.time()
2312 while not self.is_active():
2313 if timeout is not None:
2314 if time.time() - start >= timeout:
2315 self.log('dumping pgs')
2316 out = self.raw_cluster_cmd('pg', 'dump')
2317 self.log(out)
2318 assert time.time() - start < timeout, \
2319 'failed to become active before timeout expired'
2320 time.sleep(3)
2321 self.log("active!")
2322
3efd9988
FG
2323 def wait_till_pg_convergence(self, timeout=None):
2324 start = time.time()
2325 old_stats = None
2326 active_osds = [osd['osd'] for osd in self.get_osd_dump()
2327 if osd['in'] and osd['up']]
2328 while True:
2329 # strictly speaking, no need to wait for mon. but due to the
2330 # "ms inject socket failures" setting, the osdmap could be delayed,
2331 # so mgr is likely to ignore the pg-stat messages with pgs serving
2332 # newly created pools which is not yet known by mgr. so, to make sure
2333 # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
2334 # necessary.
2335 self.flush_pg_stats(active_osds)
2336 new_stats = dict((stat['pgid'], stat['state'])
2337 for stat in self.get_pg_stats())
2338 if old_stats == new_stats:
2339 return old_stats
2340 if timeout is not None:
2341 assert time.time() - start < timeout, \
2342 'failed to reach convergence before %d secs' % timeout
2343 old_stats = new_stats
2344 # longer than mgr_stats_period
2345 time.sleep(5 + 1)
2346
7c673cae
FG
2347 def mark_out_osd(self, osd):
2348 """
2349 Wrapper to mark osd out.
2350 """
2351 self.raw_cluster_cmd('osd', 'out', str(osd))
2352
2353 def kill_osd(self, osd):
2354 """
2355 Kill osds by either power cycling (if indicated by the config)
2356 or by stopping.
2357 """
2358 if self.config.get('powercycle'):
2359 remote = self.find_remote('osd', osd)
2360 self.log('kill_osd on osd.{o} '
2361 'doing powercycle of {s}'.format(o=osd, s=remote.name))
2362 self._assert_ipmi(remote)
2363 remote.console.power_off()
2364 elif self.config.get('bdev_inject_crash') and self.config.get('bdev_inject_crash_probability'):
2365 if random.uniform(0, 1) < self.config.get('bdev_inject_crash_probability', .5):
2366 self.raw_cluster_cmd(
2367 '--', 'tell', 'osd.%d' % osd,
2368 'injectargs',
2369 '--bdev-inject-crash %d' % self.config.get('bdev_inject_crash'),
2370 )
2371 try:
2372 self.ctx.daemons.get_daemon('osd', osd, self.cluster).wait()
2373 except:
2374 pass
2375 else:
2376 raise RuntimeError('osd.%s did not fail' % osd)
2377 else:
2378 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2379 else:
2380 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2381
2382 @staticmethod
2383 def _assert_ipmi(remote):
2384 assert remote.console.has_ipmi_credentials, (
2385 "powercycling requested but RemoteConsole is not "
2386 "initialized. Check ipmi config.")
2387
2388 def blackhole_kill_osd(self, osd):
2389 """
2390 Stop osd if nothing else works.
2391 """
2392 self.raw_cluster_cmd('--', 'tell', 'osd.%d' % osd,
2393 'injectargs',
2394 '--objectstore-blackhole')
2395 time.sleep(2)
2396 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2397
3efd9988 2398 def revive_osd(self, osd, timeout=360, skip_admin_check=False):
7c673cae
FG
2399 """
2400 Revive osds by either power cycling (if indicated by the config)
2401 or by restarting.
2402 """
2403 if self.config.get('powercycle'):
2404 remote = self.find_remote('osd', osd)
2405 self.log('kill_osd on osd.{o} doing powercycle of {s}'.
2406 format(o=osd, s=remote.name))
2407 self._assert_ipmi(remote)
2408 remote.console.power_on()
2409 if not remote.console.check_status(300):
2410 raise Exception('Failed to revive osd.{o} via ipmi'.
2411 format(o=osd))
2412 teuthology.reconnect(self.ctx, 60, [remote])
2413 mount_osd_data(self.ctx, remote, self.cluster, str(osd))
2414 self.make_admin_daemon_dir(remote)
2415 self.ctx.daemons.get_daemon('osd', osd, self.cluster).reset()
2416 self.ctx.daemons.get_daemon('osd', osd, self.cluster).restart()
2417
2418 if not skip_admin_check:
2419 # wait for dump_ops_in_flight; this command doesn't appear
2420 # until after the signal handler is installed and it is safe
2421 # to stop the osd again without making valgrind leak checks
2422 # unhappy. see #5924.
2423 self.wait_run_admin_socket('osd', osd,
2424 args=['dump_ops_in_flight'],
2425 timeout=timeout, stdout=DEVNULL)
2426
2427 def mark_down_osd(self, osd):
2428 """
2429 Cluster command wrapper
2430 """
2431 self.raw_cluster_cmd('osd', 'down', str(osd))
2432
2433 def mark_in_osd(self, osd):
2434 """
2435 Cluster command wrapper
2436 """
2437 self.raw_cluster_cmd('osd', 'in', str(osd))
2438
2439 def signal_osd(self, osd, sig, silent=False):
2440 """
2441 Wrapper to local get_daemon call which sends the given
2442 signal to the given osd.
2443 """
2444 self.ctx.daemons.get_daemon('osd', osd,
2445 self.cluster).signal(sig, silent=silent)
2446
2447 ## monitors
2448 def signal_mon(self, mon, sig, silent=False):
2449 """
2450 Wrapper to local get_deamon call
2451 """
2452 self.ctx.daemons.get_daemon('mon', mon,
2453 self.cluster).signal(sig, silent=silent)
2454
2455 def kill_mon(self, mon):
2456 """
2457 Kill the monitor by either power cycling (if the config says so),
2458 or by doing a stop.
2459 """
2460 if self.config.get('powercycle'):
2461 remote = self.find_remote('mon', mon)
2462 self.log('kill_mon on mon.{m} doing powercycle of {s}'.
2463 format(m=mon, s=remote.name))
2464 self._assert_ipmi(remote)
2465 remote.console.power_off()
2466 else:
2467 self.ctx.daemons.get_daemon('mon', mon, self.cluster).stop()
2468
2469 def revive_mon(self, mon):
2470 """
2471 Restart by either power cycling (if the config says so),
2472 or by doing a normal restart.
2473 """
2474 if self.config.get('powercycle'):
2475 remote = self.find_remote('mon', mon)
2476 self.log('revive_mon on mon.{m} doing powercycle of {s}'.
2477 format(m=mon, s=remote.name))
2478 self._assert_ipmi(remote)
2479 remote.console.power_on()
2480 self.make_admin_daemon_dir(remote)
2481 self.ctx.daemons.get_daemon('mon', mon, self.cluster).restart()
2482
31f18b77
FG
2483 def revive_mgr(self, mgr):
2484 """
2485 Restart by either power cycling (if the config says so),
2486 or by doing a normal restart.
2487 """
2488 if self.config.get('powercycle'):
2489 remote = self.find_remote('mgr', mgr)
2490 self.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
2491 format(m=mgr, s=remote.name))
2492 self._assert_ipmi(remote)
2493 remote.console.power_on()
2494 self.make_admin_daemon_dir(remote)
2495 self.ctx.daemons.get_daemon('mgr', mgr, self.cluster).restart()
2496
7c673cae
FG
2497 def get_mon_status(self, mon):
2498 """
2499 Extract all the monitor status information from the cluster
2500 """
2501 addr = self.ctx.ceph[self.cluster].conf['mon.%s' % mon]['mon addr']
2502 out = self.raw_cluster_cmd('-m', addr, 'mon_status')
2503 return json.loads(out)
2504
2505 def get_mon_quorum(self):
2506 """
2507 Extract monitor quorum information from the cluster
2508 """
2509 out = self.raw_cluster_cmd('quorum_status')
2510 j = json.loads(out)
2511 self.log('quorum_status is %s' % out)
2512 return j['quorum']
2513
2514 def wait_for_mon_quorum_size(self, size, timeout=300):
2515 """
2516 Loop until quorum size is reached.
2517 """
2518 self.log('waiting for quorum size %d' % size)
2519 start = time.time()
2520 while not len(self.get_mon_quorum()) == size:
2521 if timeout is not None:
2522 assert time.time() - start < timeout, \
2523 ('failed to reach quorum size %d '
2524 'before timeout expired' % size)
2525 time.sleep(3)
2526 self.log("quorum is size %d" % size)
2527
2528 def get_mon_health(self, debug=False):
2529 """
2530 Extract all the monitor health information.
2531 """
2532 out = self.raw_cluster_cmd('health', '--format=json')
2533 if debug:
2534 self.log('health:\n{h}'.format(h=out))
2535 return json.loads(out)
2536
2537 def get_mds_status(self, mds):
2538 """
2539 Run cluster commands for the mds in order to get mds information
2540 """
2541 out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
2542 j = json.loads(' '.join(out.splitlines()[1:]))
2543 # collate; for dup ids, larger gid wins.
2544 for info in j['info'].itervalues():
2545 if info['name'] == mds:
2546 return info
2547 return None
2548
2549 def get_filepath(self):
2550 """
2551 Return path to osd data with {id} needing to be replaced
2552 """
2553 return '/var/lib/ceph/osd/' + self.cluster + '-{id}'
2554
2555 def make_admin_daemon_dir(self, remote):
2556 """
2557 Create /var/run/ceph directory on remote site.
2558
2559 :param ctx: Context
2560 :param remote: Remote site
2561 """
2562 remote.run(args=['sudo',
2563 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
2564
2565
2566def utility_task(name):
2567 """
2568 Generate ceph_manager subtask corresponding to ceph_manager
2569 method name
2570 """
2571 def task(ctx, config):
2572 if config is None:
2573 config = {}
2574 args = config.get('args', [])
2575 kwargs = config.get('kwargs', {})
2576 cluster = config.get('cluster', 'ceph')
2577 fn = getattr(ctx.managers[cluster], name)
2578 fn(*args, **kwargs)
2579 return task
2580
2581revive_osd = utility_task("revive_osd")
2582revive_mon = utility_task("revive_mon")
2583kill_osd = utility_task("kill_osd")
2584kill_mon = utility_task("kill_mon")
2585create_pool = utility_task("create_pool")
2586remove_pool = utility_task("remove_pool")
2587wait_for_clean = utility_task("wait_for_clean")
c07f9fc5 2588flush_all_pg_stats = utility_task("flush_all_pg_stats")
7c673cae
FG
2589set_pool_property = utility_task("set_pool_property")
2590do_pg_scrub = utility_task("do_pg_scrub")
c07f9fc5
FG
2591wait_for_pool = utility_task("wait_for_pool")
2592wait_for_pools = utility_task("wait_for_pools")