]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/ceph_manager.py
update sources to 12.2.7
[ceph.git] / ceph / qa / tasks / ceph_manager.py
1 """
2 ceph manager -- Thrasher and CephManager objects
3 """
4 from cStringIO import StringIO
5 from functools import wraps
6 import contextlib
7 import random
8 import signal
9 import time
10 import gevent
11 import base64
12 import json
13 import logging
14 import threading
15 import traceback
16 import os
17 from teuthology import misc as teuthology
18 from tasks.scrub import Scrubber
19 from util.rados import cmd_erasure_code_profile
20 from util import get_remote
21 from teuthology.contextutil import safe_while
22 from teuthology.orchestra.remote import Remote
23 from teuthology.orchestra import run
24 from teuthology.exceptions import CommandFailedError
25
26 try:
27 from subprocess import DEVNULL # py3k
28 except ImportError:
29 DEVNULL = open(os.devnull, 'r+')
30
31 DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
32
33 log = logging.getLogger(__name__)
34
35
36 def write_conf(ctx, conf_path=DEFAULT_CONF_PATH, cluster='ceph'):
37 conf_fp = StringIO()
38 ctx.ceph[cluster].conf.write(conf_fp)
39 conf_fp.seek(0)
40 writes = ctx.cluster.run(
41 args=[
42 'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
43 'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
44 'sudo', 'python',
45 '-c',
46 ('import shutil, sys; '
47 'shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))'),
48 conf_path,
49 run.Raw('&&'),
50 'sudo', 'chmod', '0644', conf_path,
51 ],
52 stdin=run.PIPE,
53 wait=False)
54 teuthology.feed_many_stdins_and_close(conf_fp, writes)
55 run.wait(writes)
56
57
58 def mount_osd_data(ctx, remote, cluster, osd):
59 """
60 Mount a remote OSD
61
62 :param ctx: Context
63 :param remote: Remote site
64 :param cluster: name of ceph cluster
65 :param osd: Osd name
66 """
67 log.debug('Mounting data for osd.{o} on {r}'.format(o=osd, r=remote))
68 role = "{0}.osd.{1}".format(cluster, osd)
69 alt_role = role if cluster != 'ceph' else "osd.{0}".format(osd)
70 if remote in ctx.disk_config.remote_to_roles_to_dev:
71 if alt_role in ctx.disk_config.remote_to_roles_to_dev[remote]:
72 role = alt_role
73 if role not in ctx.disk_config.remote_to_roles_to_dev[remote]:
74 return
75 dev = ctx.disk_config.remote_to_roles_to_dev[remote][role]
76 mount_options = ctx.disk_config.\
77 remote_to_roles_to_dev_mount_options[remote][role]
78 fstype = ctx.disk_config.remote_to_roles_to_dev_fstype[remote][role]
79 mnt = os.path.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster, osd))
80
81 log.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
82 'mountpoint: {p}, type: {t}, options: {v}'.format(
83 o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options,
84 c=cluster))
85
86 remote.run(
87 args=[
88 'sudo',
89 'mount',
90 '-t', fstype,
91 '-o', ','.join(mount_options),
92 dev,
93 mnt,
94 ]
95 )
96
97
98 class Thrasher:
99 """
100 Object used to thrash Ceph
101 """
102 def __init__(self, manager, config, logger=None):
103 self.ceph_manager = manager
104 self.cluster = manager.cluster
105 self.ceph_manager.wait_for_clean()
106 osd_status = self.ceph_manager.get_osd_status()
107 self.in_osds = osd_status['in']
108 self.live_osds = osd_status['live']
109 self.out_osds = osd_status['out']
110 self.dead_osds = osd_status['dead']
111 self.stopping = False
112 self.logger = logger
113 self.config = config
114 self.revive_timeout = self.config.get("revive_timeout", 360)
115 self.pools_to_fix_pgp_num = set()
116 if self.config.get('powercycle'):
117 self.revive_timeout += 120
118 self.clean_wait = self.config.get('clean_wait', 0)
119 self.minin = self.config.get("min_in", 4)
120 self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
121 self.sighup_delay = self.config.get('sighup_delay')
122 self.optrack_toggle_delay = self.config.get('optrack_toggle_delay')
123 self.dump_ops_enable = self.config.get('dump_ops_enable')
124 self.noscrub_toggle_delay = self.config.get('noscrub_toggle_delay')
125 self.chance_thrash_cluster_full = self.config.get('chance_thrash_cluster_full', .05)
126 self.chance_thrash_pg_upmap = self.config.get('chance_thrash_pg_upmap', 1.0)
127 self.chance_thrash_pg_upmap_items = self.config.get('chance_thrash_pg_upmap', 1.0)
128 self.random_eio = self.config.get('random_eio')
129 self.chance_force_recovery = self.config.get('chance_force_recovery', 0.3)
130
131 num_osds = self.in_osds + self.out_osds
132 self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * num_osds
133 if self.logger is not None:
134 self.log = lambda x: self.logger.info(x)
135 else:
136 def tmp(x):
137 """
138 Implement log behavior
139 """
140 print x
141 self.log = tmp
142 if self.config is None:
143 self.config = dict()
144 # prevent monitor from auto-marking things out while thrasher runs
145 # try both old and new tell syntax, in case we are testing old code
146 self.saved_options = []
147 # assuming that the default settings do not vary from one daemon to
148 # another
149 first_mon = teuthology.get_first_mon(manager.ctx, self.config).split('.')
150 opts = [('mon', 'mon_osd_down_out_interval', 0)]
151 for service, opt, new_value in opts:
152 old_value = manager.get_config(first_mon[0],
153 first_mon[1],
154 opt)
155 self.saved_options.append((service, opt, old_value))
156 self._set_config(service, '*', opt, new_value)
157 # initialize ceph_objectstore_tool property - must be done before
158 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
159 if (self.config.get('powercycle') or
160 not self.cmd_exists_on_osds("ceph-objectstore-tool") or
161 self.config.get('disable_objectstore_tool_tests', False)):
162 self.ceph_objectstore_tool = False
163 self.test_rm_past_intervals = False
164 if self.config.get('powercycle'):
165 self.log("Unable to test ceph-objectstore-tool, "
166 "powercycle testing")
167 else:
168 self.log("Unable to test ceph-objectstore-tool, "
169 "not available on all OSD nodes")
170 else:
171 self.ceph_objectstore_tool = \
172 self.config.get('ceph_objectstore_tool', True)
173 self.test_rm_past_intervals = \
174 self.config.get('test_rm_past_intervals', True)
175 # spawn do_thrash
176 self.thread = gevent.spawn(self.do_thrash)
177 if self.sighup_delay:
178 self.sighup_thread = gevent.spawn(self.do_sighup)
179 if self.optrack_toggle_delay:
180 self.optrack_toggle_thread = gevent.spawn(self.do_optrack_toggle)
181 if self.dump_ops_enable == "true":
182 self.dump_ops_thread = gevent.spawn(self.do_dump_ops)
183 if self.noscrub_toggle_delay:
184 self.noscrub_toggle_thread = gevent.spawn(self.do_noscrub_toggle)
185
186 def _set_config(self, service_type, service_id, name, value):
187 opt_arg = '--{name} {value}'.format(name=name, value=value)
188 whom = '.'.join([service_type, service_id])
189 self.ceph_manager.raw_cluster_cmd('--', 'tell', whom,
190 'injectargs', opt_arg)
191
192
193 def cmd_exists_on_osds(self, cmd):
194 allremotes = self.ceph_manager.ctx.cluster.only(\
195 teuthology.is_type('osd', self.cluster)).remotes.keys()
196 allremotes = list(set(allremotes))
197 for remote in allremotes:
198 proc = remote.run(args=['type', cmd], wait=True,
199 check_status=False, stdout=StringIO(),
200 stderr=StringIO())
201 if proc.exitstatus != 0:
202 return False;
203 return True;
204
205 def kill_osd(self, osd=None, mark_down=False, mark_out=False):
206 """
207 :param osd: Osd to be killed.
208 :mark_down: Mark down if true.
209 :mark_out: Mark out if true.
210 """
211 if osd is None:
212 osd = random.choice(self.live_osds)
213 self.log("Killing osd %s, live_osds are %s" % (str(osd),
214 str(self.live_osds)))
215 self.live_osds.remove(osd)
216 self.dead_osds.append(osd)
217 self.ceph_manager.kill_osd(osd)
218 if mark_down:
219 self.ceph_manager.mark_down_osd(osd)
220 if mark_out and osd in self.in_osds:
221 self.out_osd(osd)
222 if self.ceph_objectstore_tool:
223 self.log("Testing ceph-objectstore-tool on down osd")
224 remote = self.ceph_manager.find_remote('osd', osd)
225 FSPATH = self.ceph_manager.get_filepath()
226 JPATH = os.path.join(FSPATH, "journal")
227 exp_osd = imp_osd = osd
228 exp_remote = imp_remote = remote
229 # If an older osd is available we'll move a pg from there
230 if (len(self.dead_osds) > 1 and
231 random.random() < self.chance_move_pg):
232 exp_osd = random.choice(self.dead_osds[:-1])
233 exp_remote = self.ceph_manager.find_remote('osd', exp_osd)
234 if ('keyvaluestore_backend' in
235 self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
236 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
237 "--data-path {fpath} --journal-path {jpath} "
238 "--type keyvaluestore "
239 "--log-file="
240 "/var/log/ceph/objectstore_tool.\\$pid.log ".
241 format(fpath=FSPATH, jpath=JPATH))
242 else:
243 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
244 "--data-path {fpath} --journal-path {jpath} "
245 "--log-file="
246 "/var/log/ceph/objectstore_tool.\\$pid.log ".
247 format(fpath=FSPATH, jpath=JPATH))
248 cmd = (prefix + "--op list-pgs").format(id=exp_osd)
249
250 # ceph-objectstore-tool might be temporarily absent during an
251 # upgrade - see http://tracker.ceph.com/issues/18014
252 with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
253 while proceed():
254 proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'],
255 wait=True, check_status=False, stdout=StringIO(),
256 stderr=StringIO())
257 if proc.exitstatus == 0:
258 break
259 log.debug("ceph-objectstore-tool binary not present, trying again")
260
261 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
262 # see http://tracker.ceph.com/issues/19556
263 with safe_while(sleep=15, tries=40, action="ceph-objectstore-tool --op list-pgs") as proceed:
264 while proceed():
265 proc = exp_remote.run(args=cmd, wait=True,
266 check_status=False,
267 stdout=StringIO(), stderr=StringIO())
268 if proc.exitstatus == 0:
269 break
270 elif proc.exitstatus == 1 and proc.stderr == "OSD has the store locked":
271 continue
272 else:
273 raise Exception("ceph-objectstore-tool: "
274 "exp list-pgs failure with status {ret}".
275 format(ret=proc.exitstatus))
276
277 pgs = proc.stdout.getvalue().split('\n')[:-1]
278 if len(pgs) == 0:
279 self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
280 return
281 pg = random.choice(pgs)
282 exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
283 exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
284 exp_path = os.path.join(exp_path,
285 "exp.{pg}.{id}".format(
286 pg=pg,
287 id=exp_osd))
288 # export
289 # Can't use new export-remove op since this is part of upgrade testing
290 cmd = prefix + "--op export --pgid {pg} --file {file}"
291 cmd = cmd.format(id=exp_osd, pg=pg, file=exp_path)
292 proc = exp_remote.run(args=cmd)
293 if proc.exitstatus:
294 raise Exception("ceph-objectstore-tool: "
295 "export failure with status {ret}".
296 format(ret=proc.exitstatus))
297 # remove
298 cmd = prefix + "--force --op remove --pgid {pg}"
299 cmd = cmd.format(id=exp_osd, pg=pg)
300 proc = exp_remote.run(args=cmd)
301 if proc.exitstatus:
302 raise Exception("ceph-objectstore-tool: "
303 "remove failure with status {ret}".
304 format(ret=proc.exitstatus))
305 # If there are at least 2 dead osds we might move the pg
306 if exp_osd != imp_osd:
307 # If pg isn't already on this osd, then we will move it there
308 cmd = (prefix + "--op list-pgs").format(id=imp_osd)
309 proc = imp_remote.run(args=cmd, wait=True,
310 check_status=False, stdout=StringIO())
311 if proc.exitstatus:
312 raise Exception("ceph-objectstore-tool: "
313 "imp list-pgs failure with status {ret}".
314 format(ret=proc.exitstatus))
315 pgs = proc.stdout.getvalue().split('\n')[:-1]
316 if pg not in pgs:
317 self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
318 format(pg=pg, fosd=exp_osd, tosd=imp_osd))
319 if imp_remote != exp_remote:
320 # Copy export file to the other machine
321 self.log("Transfer export file from {srem} to {trem}".
322 format(srem=exp_remote, trem=imp_remote))
323 tmpexport = Remote.get_file(exp_remote, exp_path)
324 Remote.put_file(imp_remote, tmpexport, exp_path)
325 os.remove(tmpexport)
326 else:
327 # Can't move the pg after all
328 imp_osd = exp_osd
329 imp_remote = exp_remote
330 # import
331 cmd = (prefix + "--op import --file {file}")
332 cmd = cmd.format(id=imp_osd, file=exp_path)
333 proc = imp_remote.run(args=cmd, wait=True, check_status=False,
334 stderr=StringIO())
335 if proc.exitstatus == 1:
336 bogosity = "The OSD you are using is older than the exported PG"
337 if bogosity in proc.stderr.getvalue():
338 self.log("OSD older than exported PG"
339 "...ignored")
340 elif proc.exitstatus == 10:
341 self.log("Pool went away before processing an import"
342 "...ignored")
343 elif proc.exitstatus == 11:
344 self.log("Attempt to import an incompatible export"
345 "...ignored")
346 elif proc.exitstatus:
347 raise Exception("ceph-objectstore-tool: "
348 "import failure with status {ret}".
349 format(ret=proc.exitstatus))
350 cmd = "rm -f {file}".format(file=exp_path)
351 exp_remote.run(args=cmd)
352 if imp_remote != exp_remote:
353 imp_remote.run(args=cmd)
354
355 # apply low split settings to each pool
356 for pool in self.ceph_manager.list_pools():
357 no_sudo_prefix = prefix[5:]
358 cmd = ("CEPH_ARGS='--filestore-merge-threshold 1 "
359 "--filestore-split-multiple 1' sudo -E "
360 + no_sudo_prefix + "--op apply-layout-settings --pool " + pool).format(id=osd)
361 proc = remote.run(args=cmd, wait=True, check_status=False, stderr=StringIO())
362 output = proc.stderr.getvalue()
363 if 'Couldn\'t find pool' in output:
364 continue
365 if proc.exitstatus:
366 raise Exception("ceph-objectstore-tool apply-layout-settings"
367 " failed with {status}".format(status=proc.exitstatus))
368
369 def rm_past_intervals(self, osd=None):
370 """
371 :param osd: Osd to find pg to remove past intervals
372 """
373 if self.test_rm_past_intervals:
374 if osd is None:
375 osd = random.choice(self.dead_osds)
376 self.log("Use ceph_objectstore_tool to remove past intervals")
377 remote = self.ceph_manager.find_remote('osd', osd)
378 FSPATH = self.ceph_manager.get_filepath()
379 JPATH = os.path.join(FSPATH, "journal")
380 if ('keyvaluestore_backend' in
381 self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
382 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
383 "--data-path {fpath} --journal-path {jpath} "
384 "--type keyvaluestore "
385 "--log-file="
386 "/var/log/ceph/objectstore_tool.\\$pid.log ".
387 format(fpath=FSPATH, jpath=JPATH))
388 else:
389 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
390 "--data-path {fpath} --journal-path {jpath} "
391 "--log-file="
392 "/var/log/ceph/objectstore_tool.\\$pid.log ".
393 format(fpath=FSPATH, jpath=JPATH))
394 cmd = (prefix + "--op list-pgs").format(id=osd)
395 proc = remote.run(args=cmd, wait=True,
396 check_status=False, stdout=StringIO())
397 if proc.exitstatus:
398 raise Exception("ceph_objectstore_tool: "
399 "exp list-pgs failure with status {ret}".
400 format(ret=proc.exitstatus))
401 pgs = proc.stdout.getvalue().split('\n')[:-1]
402 if len(pgs) == 0:
403 self.log("No PGs found for osd.{osd}".format(osd=osd))
404 return
405 pg = random.choice(pgs)
406 cmd = (prefix + "--op rm-past-intervals --pgid {pg}").\
407 format(id=osd, pg=pg)
408 proc = remote.run(args=cmd)
409 if proc.exitstatus:
410 raise Exception("ceph_objectstore_tool: "
411 "rm-past-intervals failure with status {ret}".
412 format(ret=proc.exitstatus))
413
414 def blackhole_kill_osd(self, osd=None):
415 """
416 If all else fails, kill the osd.
417 :param osd: Osd to be killed.
418 """
419 if osd is None:
420 osd = random.choice(self.live_osds)
421 self.log("Blackholing and then killing osd %s, live_osds are %s" %
422 (str(osd), str(self.live_osds)))
423 self.live_osds.remove(osd)
424 self.dead_osds.append(osd)
425 self.ceph_manager.blackhole_kill_osd(osd)
426
427 def revive_osd(self, osd=None, skip_admin_check=False):
428 """
429 Revive the osd.
430 :param osd: Osd to be revived.
431 """
432 if osd is None:
433 osd = random.choice(self.dead_osds)
434 self.log("Reviving osd %s" % (str(osd),))
435 self.ceph_manager.revive_osd(
436 osd,
437 self.revive_timeout,
438 skip_admin_check=skip_admin_check)
439 self.dead_osds.remove(osd)
440 self.live_osds.append(osd)
441 if self.random_eio > 0 and osd is self.rerrosd:
442 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
443 'injectargs', '--', '--filestore_debug_random_read_err='+str(self.random_eio))
444 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
445 'injectargs', '--', '--bluestore_debug_random_read_err='+str(self.random_eio))
446
447
448 def out_osd(self, osd=None):
449 """
450 Mark the osd out
451 :param osd: Osd to be marked.
452 """
453 if osd is None:
454 osd = random.choice(self.in_osds)
455 self.log("Removing osd %s, in_osds are: %s" %
456 (str(osd), str(self.in_osds)))
457 self.ceph_manager.mark_out_osd(osd)
458 self.in_osds.remove(osd)
459 self.out_osds.append(osd)
460
461 def in_osd(self, osd=None):
462 """
463 Mark the osd out
464 :param osd: Osd to be marked.
465 """
466 if osd is None:
467 osd = random.choice(self.out_osds)
468 if osd in self.dead_osds:
469 return self.revive_osd(osd)
470 self.log("Adding osd %s" % (str(osd),))
471 self.out_osds.remove(osd)
472 self.in_osds.append(osd)
473 self.ceph_manager.mark_in_osd(osd)
474 self.log("Added osd %s" % (str(osd),))
475
476 def reweight_osd_or_by_util(self, osd=None):
477 """
478 Reweight an osd that is in
479 :param osd: Osd to be marked.
480 """
481 if osd is not None or random.choice([True, False]):
482 if osd is None:
483 osd = random.choice(self.in_osds)
484 val = random.uniform(.1, 1.0)
485 self.log("Reweighting osd %s to %s" % (str(osd), str(val)))
486 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
487 str(osd), str(val))
488 else:
489 # do it several times, the option space is large
490 for i in range(5):
491 options = {
492 'max_change': random.choice(['0.05', '1.0', '3.0']),
493 'overage': random.choice(['110', '1000']),
494 'type': random.choice([
495 'reweight-by-utilization',
496 'test-reweight-by-utilization']),
497 }
498 self.log("Reweighting by: %s"%(str(options),))
499 self.ceph_manager.raw_cluster_cmd(
500 'osd',
501 options['type'],
502 options['overage'],
503 options['max_change'])
504
505 def primary_affinity(self, osd=None):
506 if osd is None:
507 osd = random.choice(self.in_osds)
508 if random.random() >= .5:
509 pa = random.random()
510 elif random.random() >= .5:
511 pa = 1
512 else:
513 pa = 0
514 self.log('Setting osd %s primary_affinity to %f' % (str(osd), pa))
515 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
516 str(osd), str(pa))
517
518 def thrash_cluster_full(self):
519 """
520 Set and unset cluster full condition
521 """
522 self.log('Setting full ratio to .001')
523 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
524 time.sleep(1)
525 self.log('Setting full ratio back to .95')
526 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
527
528 def thrash_pg_upmap(self):
529 """
530 Install or remove random pg_upmap entries in OSDMap
531 """
532 from random import shuffle
533 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
534 j = json.loads(out)
535 self.log('j is %s' % j)
536 try:
537 if random.random() >= .3:
538 pgs = self.ceph_manager.get_pg_stats()
539 pg = random.choice(pgs)
540 pgid = str(pg['pgid'])
541 poolid = int(pgid.split('.')[0])
542 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
543 if len(sizes) == 0:
544 return
545 n = sizes[0]
546 osds = self.in_osds + self.out_osds
547 shuffle(osds)
548 osds = osds[0:n]
549 self.log('Setting %s to %s' % (pgid, osds))
550 cmd = ['osd', 'pg-upmap', pgid] + [str(x) for x in osds]
551 self.log('cmd %s' % cmd)
552 self.ceph_manager.raw_cluster_cmd(*cmd)
553 else:
554 m = j['pg_upmap']
555 if len(m) > 0:
556 shuffle(m)
557 pg = m[0]['pgid']
558 self.log('Clearing pg_upmap on %s' % pg)
559 self.ceph_manager.raw_cluster_cmd(
560 'osd',
561 'rm-pg-upmap',
562 pg)
563 else:
564 self.log('No pg_upmap entries; doing nothing')
565 except CommandFailedError:
566 self.log('Failed to rm-pg-upmap, ignoring')
567
568 def thrash_pg_upmap_items(self):
569 """
570 Install or remove random pg_upmap_items entries in OSDMap
571 """
572 from random import shuffle
573 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
574 j = json.loads(out)
575 self.log('j is %s' % j)
576 try:
577 if random.random() >= .3:
578 pgs = self.ceph_manager.get_pg_stats()
579 pg = random.choice(pgs)
580 pgid = str(pg['pgid'])
581 poolid = int(pgid.split('.')[0])
582 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
583 if len(sizes) == 0:
584 return
585 n = sizes[0]
586 osds = self.in_osds + self.out_osds
587 shuffle(osds)
588 osds = osds[0:n*2]
589 self.log('Setting %s to %s' % (pgid, osds))
590 cmd = ['osd', 'pg-upmap-items', pgid] + [str(x) for x in osds]
591 self.log('cmd %s' % cmd)
592 self.ceph_manager.raw_cluster_cmd(*cmd)
593 else:
594 m = j['pg_upmap_items']
595 if len(m) > 0:
596 shuffle(m)
597 pg = m[0]['pgid']
598 self.log('Clearing pg_upmap on %s' % pg)
599 self.ceph_manager.raw_cluster_cmd(
600 'osd',
601 'rm-pg-upmap-items',
602 pg)
603 else:
604 self.log('No pg_upmap entries; doing nothing')
605 except CommandFailedError:
606 self.log('Failed to rm-pg-upmap-items, ignoring')
607
608 def force_recovery(self):
609 """
610 Force recovery on some of PGs
611 """
612 backfill = random.random() >= 0.5
613 j = self.ceph_manager.get_pgids_to_force(backfill)
614 if j:
615 try:
616 if backfill:
617 self.ceph_manager.raw_cluster_cmd('pg', 'force-backfill', *j)
618 else:
619 self.ceph_manager.raw_cluster_cmd('pg', 'force-recovery', *j)
620 except CommandFailedError:
621 self.log('Failed to force backfill|recovery, ignoring')
622
623
624 def cancel_force_recovery(self):
625 """
626 Force recovery on some of PGs
627 """
628 backfill = random.random() >= 0.5
629 j = self.ceph_manager.get_pgids_to_cancel_force(backfill)
630 if j:
631 try:
632 if backfill:
633 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-backfill', *j)
634 else:
635 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-recovery', *j)
636 except CommandFailedError:
637 self.log('Failed to force backfill|recovery, ignoring')
638
639 def force_cancel_recovery(self):
640 """
641 Force or cancel forcing recovery
642 """
643 if random.random() >= 0.4:
644 self.force_recovery()
645 else:
646 self.cancel_force_recovery()
647
648 def all_up(self):
649 """
650 Make sure all osds are up and not out.
651 """
652 while len(self.dead_osds) > 0:
653 self.log("reviving osd")
654 self.revive_osd()
655 while len(self.out_osds) > 0:
656 self.log("inning osd")
657 self.in_osd()
658
659 def all_up_in(self):
660 """
661 Make sure all osds are up and fully in.
662 """
663 self.all_up();
664 for osd in self.live_osds:
665 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
666 str(osd), str(1))
667 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
668 str(osd), str(1))
669
670 def do_join(self):
671 """
672 Break out of this Ceph loop
673 """
674 self.stopping = True
675 self.thread.get()
676 if self.sighup_delay:
677 self.log("joining the do_sighup greenlet")
678 self.sighup_thread.get()
679 if self.optrack_toggle_delay:
680 self.log("joining the do_optrack_toggle greenlet")
681 self.optrack_toggle_thread.join()
682 if self.dump_ops_enable == "true":
683 self.log("joining the do_dump_ops greenlet")
684 self.dump_ops_thread.join()
685 if self.noscrub_toggle_delay:
686 self.log("joining the do_noscrub_toggle greenlet")
687 self.noscrub_toggle_thread.join()
688
689 def grow_pool(self):
690 """
691 Increase the size of the pool
692 """
693 pool = self.ceph_manager.get_pool()
694 orig_pg_num = self.ceph_manager.get_pool_pg_num(pool)
695 self.log("Growing pool %s" % (pool,))
696 if self.ceph_manager.expand_pool(pool,
697 self.config.get('pool_grow_by', 10),
698 self.max_pgs):
699 self.pools_to_fix_pgp_num.add(pool)
700
701 def fix_pgp_num(self, pool=None):
702 """
703 Fix number of pgs in pool.
704 """
705 if pool is None:
706 pool = self.ceph_manager.get_pool()
707 force = False
708 else:
709 force = True
710 self.log("fixing pg num pool %s" % (pool,))
711 if self.ceph_manager.set_pool_pgpnum(pool, force):
712 self.pools_to_fix_pgp_num.discard(pool)
713
714 def test_pool_min_size(self):
715 """
716 Kill and revive all osds except one.
717 """
718 self.log("test_pool_min_size")
719 self.all_up()
720 self.ceph_manager.wait_for_recovery(
721 timeout=self.config.get('timeout')
722 )
723 the_one = random.choice(self.in_osds)
724 self.log("Killing everyone but %s", the_one)
725 to_kill = filter(lambda x: x != the_one, self.in_osds)
726 [self.kill_osd(i) for i in to_kill]
727 [self.out_osd(i) for i in to_kill]
728 time.sleep(self.config.get("test_pool_min_size_time", 10))
729 self.log("Killing %s" % (the_one,))
730 self.kill_osd(the_one)
731 self.out_osd(the_one)
732 self.log("Reviving everyone but %s" % (the_one,))
733 [self.revive_osd(i) for i in to_kill]
734 [self.in_osd(i) for i in to_kill]
735 self.log("Revived everyone but %s" % (the_one,))
736 self.log("Waiting for clean")
737 self.ceph_manager.wait_for_recovery(
738 timeout=self.config.get('timeout')
739 )
740
741 def inject_pause(self, conf_key, duration, check_after, should_be_down):
742 """
743 Pause injection testing. Check for osd being down when finished.
744 """
745 the_one = random.choice(self.live_osds)
746 self.log("inject_pause on {osd}".format(osd=the_one))
747 self.log(
748 "Testing {key} pause injection for duration {duration}".format(
749 key=conf_key,
750 duration=duration
751 ))
752 self.log(
753 "Checking after {after}, should_be_down={shouldbedown}".format(
754 after=check_after,
755 shouldbedown=should_be_down
756 ))
757 self.ceph_manager.set_config(the_one, **{conf_key: duration})
758 if not should_be_down:
759 return
760 time.sleep(check_after)
761 status = self.ceph_manager.get_osd_status()
762 assert the_one in status['down']
763 time.sleep(duration - check_after + 20)
764 status = self.ceph_manager.get_osd_status()
765 assert not the_one in status['down']
766
767 def test_backfill_full(self):
768 """
769 Test backfills stopping when the replica fills up.
770
771 First, use injectfull admin command to simulate a now full
772 osd by setting it to 0 on all of the OSDs.
773
774 Second, on a random subset, set
775 osd_debug_skip_full_check_in_backfill_reservation to force
776 the more complicated check in do_scan to be exercised.
777
778 Then, verify that all backfillings stop.
779 """
780 self.log("injecting backfill full")
781 for i in self.live_osds:
782 self.ceph_manager.set_config(
783 i,
784 osd_debug_skip_full_check_in_backfill_reservation=
785 random.choice(['false', 'true']))
786 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'backfillfull'],
787 check_status=True, timeout=30, stdout=DEVNULL)
788 for i in range(30):
789 status = self.ceph_manager.compile_pg_status()
790 if 'backfilling' not in status.keys():
791 break
792 self.log(
793 "waiting for {still_going} backfillings".format(
794 still_going=status.get('backfilling')))
795 time.sleep(1)
796 assert('backfilling' not in self.ceph_manager.compile_pg_status().keys())
797 for i in self.live_osds:
798 self.ceph_manager.set_config(
799 i,
800 osd_debug_skip_full_check_in_backfill_reservation='false')
801 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'none'],
802 check_status=True, timeout=30, stdout=DEVNULL)
803
804 def test_map_discontinuity(self):
805 """
806 1) Allows the osds to recover
807 2) kills an osd
808 3) allows the remaining osds to recover
809 4) waits for some time
810 5) revives the osd
811 This sequence should cause the revived osd to have to handle
812 a map gap since the mons would have trimmed
813 """
814 while len(self.in_osds) < (self.minin + 1):
815 self.in_osd()
816 self.log("Waiting for recovery")
817 self.ceph_manager.wait_for_all_osds_up(
818 timeout=self.config.get('timeout')
819 )
820 # now we wait 20s for the pg status to change, if it takes longer,
821 # the test *should* fail!
822 time.sleep(20)
823 self.ceph_manager.wait_for_clean(
824 timeout=self.config.get('timeout')
825 )
826
827 # now we wait 20s for the backfill replicas to hear about the clean
828 time.sleep(20)
829 self.log("Recovered, killing an osd")
830 self.kill_osd(mark_down=True, mark_out=True)
831 self.log("Waiting for clean again")
832 self.ceph_manager.wait_for_clean(
833 timeout=self.config.get('timeout')
834 )
835 self.log("Waiting for trim")
836 time.sleep(int(self.config.get("map_discontinuity_sleep_time", 40)))
837 self.revive_osd()
838
839 def choose_action(self):
840 """
841 Random action selector.
842 """
843 chance_down = self.config.get('chance_down', 0.4)
844 chance_test_min_size = self.config.get('chance_test_min_size', 0)
845 chance_test_backfill_full = \
846 self.config.get('chance_test_backfill_full', 0)
847 if isinstance(chance_down, int):
848 chance_down = float(chance_down) / 100
849 minin = self.minin
850 minout = self.config.get("min_out", 0)
851 minlive = self.config.get("min_live", 2)
852 mindead = self.config.get("min_dead", 0)
853
854 self.log('choose_action: min_in %d min_out '
855 '%d min_live %d min_dead %d' %
856 (minin, minout, minlive, mindead))
857 actions = []
858 if len(self.in_osds) > minin:
859 actions.append((self.out_osd, 1.0,))
860 if len(self.live_osds) > minlive and chance_down > 0:
861 actions.append((self.kill_osd, chance_down,))
862 if len(self.dead_osds) > 1:
863 actions.append((self.rm_past_intervals, 1.0,))
864 if len(self.out_osds) > minout:
865 actions.append((self.in_osd, 1.7,))
866 if len(self.dead_osds) > mindead:
867 actions.append((self.revive_osd, 1.0,))
868 if self.config.get('thrash_primary_affinity', True):
869 actions.append((self.primary_affinity, 1.0,))
870 actions.append((self.reweight_osd_or_by_util,
871 self.config.get('reweight_osd', .5),))
872 actions.append((self.grow_pool,
873 self.config.get('chance_pgnum_grow', 0),))
874 actions.append((self.fix_pgp_num,
875 self.config.get('chance_pgpnum_fix', 0),))
876 actions.append((self.test_pool_min_size,
877 chance_test_min_size,))
878 actions.append((self.test_backfill_full,
879 chance_test_backfill_full,))
880 if self.chance_thrash_cluster_full > 0:
881 actions.append((self.thrash_cluster_full, self.chance_thrash_cluster_full,))
882 if self.chance_thrash_pg_upmap > 0:
883 actions.append((self.thrash_pg_upmap, self.chance_thrash_pg_upmap,))
884 if self.chance_thrash_pg_upmap_items > 0:
885 actions.append((self.thrash_pg_upmap_items, self.chance_thrash_pg_upmap_items,))
886 if self.chance_force_recovery > 0:
887 actions.append((self.force_cancel_recovery, self.chance_force_recovery))
888
889 for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
890 for scenario in [
891 (lambda:
892 self.inject_pause(key,
893 self.config.get('pause_short', 3),
894 0,
895 False),
896 self.config.get('chance_inject_pause_short', 1),),
897 (lambda:
898 self.inject_pause(key,
899 self.config.get('pause_long', 80),
900 self.config.get('pause_check_after', 70),
901 True),
902 self.config.get('chance_inject_pause_long', 0),)]:
903 actions.append(scenario)
904
905 total = sum([y for (x, y) in actions])
906 val = random.uniform(0, total)
907 for (action, prob) in actions:
908 if val < prob:
909 return action
910 val -= prob
911 return None
912
913 def log_exc(func):
914 @wraps(func)
915 def wrapper(self):
916 try:
917 return func(self)
918 except:
919 self.log(traceback.format_exc())
920 raise
921 return wrapper
922
923 @log_exc
924 def do_sighup(self):
925 """
926 Loops and sends signal.SIGHUP to a random live osd.
927
928 Loop delay is controlled by the config value sighup_delay.
929 """
930 delay = float(self.sighup_delay)
931 self.log("starting do_sighup with a delay of {0}".format(delay))
932 while not self.stopping:
933 osd = random.choice(self.live_osds)
934 self.ceph_manager.signal_osd(osd, signal.SIGHUP, silent=True)
935 time.sleep(delay)
936
937 @log_exc
938 def do_optrack_toggle(self):
939 """
940 Loops and toggle op tracking to all osds.
941
942 Loop delay is controlled by the config value optrack_toggle_delay.
943 """
944 delay = float(self.optrack_toggle_delay)
945 osd_state = "true"
946 self.log("starting do_optrack_toggle with a delay of {0}".format(delay))
947 while not self.stopping:
948 if osd_state == "true":
949 osd_state = "false"
950 else:
951 osd_state = "true"
952 self.ceph_manager.raw_cluster_cmd_result('tell', 'osd.*',
953 'injectargs', '--osd_enable_op_tracker=%s' % osd_state)
954 gevent.sleep(delay)
955
956 @log_exc
957 def do_dump_ops(self):
958 """
959 Loops and does op dumps on all osds
960 """
961 self.log("starting do_dump_ops")
962 while not self.stopping:
963 for osd in self.live_osds:
964 # Ignore errors because live_osds is in flux
965 self.ceph_manager.osd_admin_socket(osd, command=['dump_ops_in_flight'],
966 check_status=False, timeout=30, stdout=DEVNULL)
967 self.ceph_manager.osd_admin_socket(osd, command=['dump_blocked_ops'],
968 check_status=False, timeout=30, stdout=DEVNULL)
969 self.ceph_manager.osd_admin_socket(osd, command=['dump_historic_ops'],
970 check_status=False, timeout=30, stdout=DEVNULL)
971 gevent.sleep(0)
972
973 @log_exc
974 def do_noscrub_toggle(self):
975 """
976 Loops and toggle noscrub flags
977
978 Loop delay is controlled by the config value noscrub_toggle_delay.
979 """
980 delay = float(self.noscrub_toggle_delay)
981 scrub_state = "none"
982 self.log("starting do_noscrub_toggle with a delay of {0}".format(delay))
983 while not self.stopping:
984 if scrub_state == "none":
985 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'noscrub')
986 scrub_state = "noscrub"
987 elif scrub_state == "noscrub":
988 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
989 scrub_state = "both"
990 elif scrub_state == "both":
991 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
992 scrub_state = "nodeep-scrub"
993 else:
994 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
995 scrub_state = "none"
996 gevent.sleep(delay)
997 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
998 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
999
1000 @log_exc
1001 def do_thrash(self):
1002 """
1003 Loop to select random actions to thrash ceph manager with.
1004 """
1005 cleanint = self.config.get("clean_interval", 60)
1006 scrubint = self.config.get("scrub_interval", -1)
1007 maxdead = self.config.get("max_dead", 0)
1008 delay = self.config.get("op_delay", 5)
1009 self.rerrosd = self.live_osds[0]
1010 if self.random_eio > 0:
1011 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1012 'injectargs', '--', '--filestore_debug_random_read_err='+str(self.random_eio))
1013 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1014 'injectargs', '--', '--bluestore_debug_random_read_err='+str(self.random_eio))
1015 self.log("starting do_thrash")
1016 while not self.stopping:
1017 to_log = [str(x) for x in ["in_osds: ", self.in_osds,
1018 "out_osds: ", self.out_osds,
1019 "dead_osds: ", self.dead_osds,
1020 "live_osds: ", self.live_osds]]
1021 self.log(" ".join(to_log))
1022 if random.uniform(0, 1) < (float(delay) / cleanint):
1023 while len(self.dead_osds) > maxdead:
1024 self.revive_osd()
1025 for osd in self.in_osds:
1026 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
1027 str(osd), str(1))
1028 if random.uniform(0, 1) < float(
1029 self.config.get('chance_test_map_discontinuity', 0)):
1030 self.test_map_discontinuity()
1031 else:
1032 self.ceph_manager.wait_for_recovery(
1033 timeout=self.config.get('timeout')
1034 )
1035 time.sleep(self.clean_wait)
1036 if scrubint > 0:
1037 if random.uniform(0, 1) < (float(delay) / scrubint):
1038 self.log('Scrubbing while thrashing being performed')
1039 Scrubber(self.ceph_manager, self.config)
1040 self.choose_action()()
1041 time.sleep(delay)
1042 self.all_up()
1043 if self.random_eio > 0:
1044 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1045 'injectargs', '--', '--filestore_debug_random_read_err=0.0')
1046 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1047 'injectargs', '--', '--bluestore_debug_random_read_err=0.0')
1048 for pool in list(self.pools_to_fix_pgp_num):
1049 if self.ceph_manager.get_pool_pg_num(pool) > 0:
1050 self.fix_pgp_num(pool)
1051 self.pools_to_fix_pgp_num.clear()
1052 for service, opt, saved_value in self.saved_options:
1053 self._set_config(service, '*', opt, saved_value)
1054 self.saved_options = []
1055 self.all_up_in()
1056
1057
1058 class ObjectStoreTool:
1059
1060 def __init__(self, manager, pool, **kwargs):
1061 self.manager = manager
1062 self.pool = pool
1063 self.osd = kwargs.get('osd', None)
1064 self.object_name = kwargs.get('object_name', None)
1065 self.do_revive = kwargs.get('do_revive', True)
1066 if self.osd and self.pool and self.object_name:
1067 if self.osd == "primary":
1068 self.osd = self.manager.get_object_primary(self.pool,
1069 self.object_name)
1070 assert self.osd
1071 if self.object_name:
1072 self.pgid = self.manager.get_object_pg_with_shard(self.pool,
1073 self.object_name,
1074 self.osd)
1075 self.remote = self.manager.ctx.\
1076 cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()[0]
1077 path = self.manager.get_filepath().format(id=self.osd)
1078 self.paths = ("--data-path {path} --journal-path {path}/journal".
1079 format(path=path))
1080
1081 def build_cmd(self, options, args, stdin):
1082 lines = []
1083 if self.object_name:
1084 lines.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1085 "{paths} --pgid {pgid} --op list |"
1086 "grep '\"oid\":\"{name}\"')".
1087 format(paths=self.paths,
1088 pgid=self.pgid,
1089 name=self.object_name))
1090 args = '"$object" ' + args
1091 options += " --pgid {pgid}".format(pgid=self.pgid)
1092 cmd = ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1093 format(paths=self.paths,
1094 args=args,
1095 options=options))
1096 if stdin:
1097 cmd = ("echo {payload} | base64 --decode | {cmd}".
1098 format(payload=base64.encode(stdin),
1099 cmd=cmd))
1100 lines.append(cmd)
1101 return "\n".join(lines)
1102
1103 def run(self, options, args, stdin=None, stdout=None):
1104 if stdout is None:
1105 stdout = StringIO()
1106 self.manager.kill_osd(self.osd)
1107 cmd = self.build_cmd(options, args, stdin)
1108 self.manager.log(cmd)
1109 try:
1110 proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
1111 check_status=False,
1112 stdout=stdout,
1113 stderr=StringIO())
1114 proc.wait()
1115 if proc.exitstatus != 0:
1116 self.manager.log("failed with " + str(proc.exitstatus))
1117 error = proc.stdout.getvalue() + " " + proc.stderr.getvalue()
1118 raise Exception(error)
1119 finally:
1120 if self.do_revive:
1121 self.manager.revive_osd(self.osd)
1122 self.manager.wait_till_osd_is_up(self.osd, 300)
1123
1124
1125 class CephManager:
1126 """
1127 Ceph manager object.
1128 Contains several local functions that form a bulk of this module.
1129
1130 Note: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1131 the same name.
1132 """
1133
1134 REPLICATED_POOL = 1
1135 ERASURE_CODED_POOL = 3
1136
1137 def __init__(self, controller, ctx=None, config=None, logger=None,
1138 cluster='ceph'):
1139 self.lock = threading.RLock()
1140 self.ctx = ctx
1141 self.config = config
1142 self.controller = controller
1143 self.next_pool_id = 0
1144 self.cluster = cluster
1145 if (logger):
1146 self.log = lambda x: logger.info(x)
1147 else:
1148 def tmp(x):
1149 """
1150 implement log behavior.
1151 """
1152 print x
1153 self.log = tmp
1154 if self.config is None:
1155 self.config = dict()
1156 pools = self.list_pools()
1157 self.pools = {}
1158 for pool in pools:
1159 # we may race with a pool deletion; ignore failures here
1160 try:
1161 self.pools[pool] = self.get_pool_property(pool, 'pg_num')
1162 except CommandFailedError:
1163 self.log('Failed to get pg_num from pool %s, ignoring' % pool)
1164
1165 def raw_cluster_cmd(self, *args):
1166 """
1167 Start ceph on a raw cluster. Return count
1168 """
1169 testdir = teuthology.get_testdir(self.ctx)
1170 ceph_args = [
1171 'sudo',
1172 'adjust-ulimits',
1173 'ceph-coverage',
1174 '{tdir}/archive/coverage'.format(tdir=testdir),
1175 'timeout',
1176 '120',
1177 'ceph',
1178 '--cluster',
1179 self.cluster,
1180 ]
1181 ceph_args.extend(args)
1182 proc = self.controller.run(
1183 args=ceph_args,
1184 stdout=StringIO(),
1185 )
1186 return proc.stdout.getvalue()
1187
1188 def raw_cluster_cmd_result(self, *args):
1189 """
1190 Start ceph on a cluster. Return success or failure information.
1191 """
1192 testdir = teuthology.get_testdir(self.ctx)
1193 ceph_args = [
1194 'sudo',
1195 'adjust-ulimits',
1196 'ceph-coverage',
1197 '{tdir}/archive/coverage'.format(tdir=testdir),
1198 'timeout',
1199 '120',
1200 'ceph',
1201 '--cluster',
1202 self.cluster,
1203 ]
1204 ceph_args.extend(args)
1205 proc = self.controller.run(
1206 args=ceph_args,
1207 check_status=False,
1208 )
1209 return proc.exitstatus
1210
1211 def run_ceph_w(self):
1212 """
1213 Execute "ceph -w" in the background with stdout connected to a StringIO,
1214 and return the RemoteProcess.
1215 """
1216 return self.controller.run(
1217 args=["sudo",
1218 "daemon-helper",
1219 "kill",
1220 "ceph",
1221 '--cluster',
1222 self.cluster,
1223 "-w"],
1224 wait=False, stdout=StringIO(), stdin=run.PIPE)
1225
1226 def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
1227 """
1228 Flush pg stats from a list of OSD ids, ensuring they are reflected
1229 all the way to the monitor. Luminous and later only.
1230
1231 :param osds: list of OSDs to flush
1232 :param no_wait: list of OSDs not to wait for seq id. by default, we
1233 wait for all specified osds, but some of them could be
1234 moved out of osdmap, so we cannot get their updated
1235 stat seq from monitor anymore. in that case, you need
1236 to pass a blacklist.
1237 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
1238 it. (5 min by default)
1239 """
1240 seq = {osd: self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats')
1241 for osd in osds}
1242 if not wait_for_mon:
1243 return
1244 if no_wait is None:
1245 no_wait = []
1246 for osd, need in seq.iteritems():
1247 if osd in no_wait:
1248 continue
1249 got = 0
1250 while wait_for_mon > 0:
1251 got = self.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd)
1252 self.log('need seq {need} got {got} for osd.{osd}'.format(
1253 need=need, got=got, osd=osd))
1254 if got >= need:
1255 break
1256 A_WHILE = 1
1257 time.sleep(A_WHILE)
1258 wait_for_mon -= A_WHILE
1259 else:
1260 raise Exception('timed out waiting for mon to be updated with '
1261 'osd.{osd}: {got} < {need}'.
1262 format(osd=osd, got=got, need=need))
1263
1264 def flush_all_pg_stats(self):
1265 self.flush_pg_stats(range(len(self.get_osd_dump())))
1266
1267 def do_rados(self, remote, cmd, check_status=True):
1268 """
1269 Execute a remote rados command.
1270 """
1271 testdir = teuthology.get_testdir(self.ctx)
1272 pre = [
1273 'adjust-ulimits',
1274 'ceph-coverage',
1275 '{tdir}/archive/coverage'.format(tdir=testdir),
1276 'rados',
1277 '--cluster',
1278 self.cluster,
1279 ]
1280 pre.extend(cmd)
1281 proc = remote.run(
1282 args=pre,
1283 wait=True,
1284 check_status=check_status
1285 )
1286 return proc
1287
1288 def rados_write_objects(self, pool, num_objects, size,
1289 timelimit, threads, cleanup=False):
1290 """
1291 Write rados objects
1292 Threads not used yet.
1293 """
1294 args = [
1295 '-p', pool,
1296 '--num-objects', num_objects,
1297 '-b', size,
1298 'bench', timelimit,
1299 'write'
1300 ]
1301 if not cleanup:
1302 args.append('--no-cleanup')
1303 return self.do_rados(self.controller, map(str, args))
1304
1305 def do_put(self, pool, obj, fname, namespace=None):
1306 """
1307 Implement rados put operation
1308 """
1309 args = ['-p', pool]
1310 if namespace is not None:
1311 args += ['-N', namespace]
1312 args += [
1313 'put',
1314 obj,
1315 fname
1316 ]
1317 return self.do_rados(
1318 self.controller,
1319 args,
1320 check_status=False
1321 ).exitstatus
1322
1323 def do_get(self, pool, obj, fname='/dev/null', namespace=None):
1324 """
1325 Implement rados get operation
1326 """
1327 args = ['-p', pool]
1328 if namespace is not None:
1329 args += ['-N', namespace]
1330 args += [
1331 'get',
1332 obj,
1333 fname
1334 ]
1335 return self.do_rados(
1336 self.controller,
1337 args,
1338 check_status=False
1339 ).exitstatus
1340
1341 def do_rm(self, pool, obj, namespace=None):
1342 """
1343 Implement rados rm operation
1344 """
1345 args = ['-p', pool]
1346 if namespace is not None:
1347 args += ['-N', namespace]
1348 args += [
1349 'rm',
1350 obj
1351 ]
1352 return self.do_rados(
1353 self.controller,
1354 args,
1355 check_status=False
1356 ).exitstatus
1357
1358 def osd_admin_socket(self, osd_id, command, check_status=True, timeout=0, stdout=None):
1359 if stdout is None:
1360 stdout = StringIO()
1361 return self.admin_socket('osd', osd_id, command, check_status, timeout, stdout)
1362
1363 def find_remote(self, service_type, service_id):
1364 """
1365 Get the Remote for the host where a particular service runs.
1366
1367 :param service_type: 'mds', 'osd', 'client'
1368 :param service_id: The second part of a role, e.g. '0' for
1369 the role 'client.0'
1370 :return: a Remote instance for the host where the
1371 requested role is placed
1372 """
1373 return get_remote(self.ctx, self.cluster,
1374 service_type, service_id)
1375
1376 def admin_socket(self, service_type, service_id,
1377 command, check_status=True, timeout=0, stdout=None):
1378 """
1379 Remotely start up ceph specifying the admin socket
1380 :param command: a list of words to use as the command
1381 to the admin socket
1382 """
1383 if stdout is None:
1384 stdout = StringIO()
1385 testdir = teuthology.get_testdir(self.ctx)
1386 remote = self.find_remote(service_type, service_id)
1387 args = [
1388 'sudo',
1389 'adjust-ulimits',
1390 'ceph-coverage',
1391 '{tdir}/archive/coverage'.format(tdir=testdir),
1392 'timeout',
1393 str(timeout),
1394 'ceph',
1395 '--cluster',
1396 self.cluster,
1397 '--admin-daemon',
1398 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1399 cluster=self.cluster,
1400 type=service_type,
1401 id=service_id),
1402 ]
1403 args.extend(command)
1404 return remote.run(
1405 args=args,
1406 stdout=stdout,
1407 wait=True,
1408 check_status=check_status
1409 )
1410
1411 def objectstore_tool(self, pool, options, args, **kwargs):
1412 return ObjectStoreTool(self, pool, **kwargs).run(options, args)
1413
1414 def get_pgid(self, pool, pgnum):
1415 """
1416 :param pool: pool name
1417 :param pgnum: pg number
1418 :returns: a string representing this pg.
1419 """
1420 poolnum = self.get_pool_num(pool)
1421 pg_str = "{poolnum}.{pgnum}".format(
1422 poolnum=poolnum,
1423 pgnum=pgnum)
1424 return pg_str
1425
1426 def get_pg_replica(self, pool, pgnum):
1427 """
1428 get replica for pool, pgnum (e.g. (data, 0)->0
1429 """
1430 pg_str = self.get_pgid(pool, pgnum)
1431 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1432 j = json.loads('\n'.join(output.split('\n')[1:]))
1433 return int(j['acting'][-1])
1434 assert False
1435
1436 def wait_for_pg_stats(func):
1437 # both osd_mon_report_interval_min and mgr_stats_period are 5 seconds
1438 # by default, and take the faulty injection in ms into consideration,
1439 # 12 seconds are more than enough
1440 delays = [1, 1, 2, 3, 5, 8, 13, 0]
1441 @wraps(func)
1442 def wrapper(self, *args, **kwargs):
1443 exc = None
1444 for delay in delays:
1445 try:
1446 return func(self, *args, **kwargs)
1447 except AssertionError as e:
1448 time.sleep(delay)
1449 exc = e
1450 raise exc
1451 return wrapper
1452
1453 def get_pg_primary(self, pool, pgnum):
1454 """
1455 get primary for pool, pgnum (e.g. (data, 0)->0
1456 """
1457 pg_str = self.get_pgid(pool, pgnum)
1458 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1459 j = json.loads('\n'.join(output.split('\n')[1:]))
1460 return int(j['acting'][0])
1461 assert False
1462
1463 def get_pool_num(self, pool):
1464 """
1465 get number for pool (e.g., data -> 2)
1466 """
1467 return int(self.get_pool_dump(pool)['pool'])
1468
1469 def list_pools(self):
1470 """
1471 list all pool names
1472 """
1473 osd_dump = self.get_osd_dump_json()
1474 self.log(osd_dump['pools'])
1475 return [str(i['pool_name']) for i in osd_dump['pools']]
1476
1477 def clear_pools(self):
1478 """
1479 remove all pools
1480 """
1481 [self.remove_pool(i) for i in self.list_pools()]
1482
1483 def kick_recovery_wq(self, osdnum):
1484 """
1485 Run kick_recovery_wq on cluster.
1486 """
1487 return self.raw_cluster_cmd(
1488 'tell', "osd.%d" % (int(osdnum),),
1489 'debug',
1490 'kick_recovery_wq',
1491 '0')
1492
1493 def wait_run_admin_socket(self, service_type,
1494 service_id, args=['version'], timeout=75, stdout=None):
1495 """
1496 If osd_admin_socket call suceeds, return. Otherwise wait
1497 five seconds and try again.
1498 """
1499 if stdout is None:
1500 stdout = StringIO()
1501 tries = 0
1502 while True:
1503 proc = self.admin_socket(service_type, service_id,
1504 args, check_status=False, stdout=stdout)
1505 if proc.exitstatus is 0:
1506 return proc
1507 else:
1508 tries += 1
1509 if (tries * 5) > timeout:
1510 raise Exception('timed out waiting for admin_socket '
1511 'to appear after {type}.{id} restart'.
1512 format(type=service_type,
1513 id=service_id))
1514 self.log("waiting on admin_socket for {type}-{id}, "
1515 "{command}".format(type=service_type,
1516 id=service_id,
1517 command=args))
1518 time.sleep(5)
1519
1520 def get_pool_dump(self, pool):
1521 """
1522 get the osd dump part of a pool
1523 """
1524 osd_dump = self.get_osd_dump_json()
1525 for i in osd_dump['pools']:
1526 if i['pool_name'] == pool:
1527 return i
1528 assert False
1529
1530 def get_config(self, service_type, service_id, name):
1531 """
1532 :param node: like 'mon.a'
1533 :param name: the option name
1534 """
1535 proc = self.wait_run_admin_socket(service_type, service_id,
1536 ['config', 'show'])
1537 j = json.loads(proc.stdout.getvalue())
1538 return j[name]
1539
1540 def set_config(self, osdnum, **argdict):
1541 """
1542 :param osdnum: osd number
1543 :param argdict: dictionary containing values to set.
1544 """
1545 for k, v in argdict.iteritems():
1546 self.wait_run_admin_socket(
1547 'osd', osdnum,
1548 ['config', 'set', str(k), str(v)])
1549
1550 def raw_cluster_status(self):
1551 """
1552 Get status from cluster
1553 """
1554 status = self.raw_cluster_cmd('status', '--format=json-pretty')
1555 return json.loads(status)
1556
1557 def raw_osd_status(self):
1558 """
1559 Get osd status from cluster
1560 """
1561 return self.raw_cluster_cmd('osd', 'dump')
1562
1563 def get_osd_status(self):
1564 """
1565 Get osd statuses sorted by states that the osds are in.
1566 """
1567 osd_lines = filter(
1568 lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
1569 self.raw_osd_status().split('\n'))
1570 self.log(osd_lines)
1571 in_osds = [int(i[4:].split()[0])
1572 for i in filter(lambda x: " in " in x, osd_lines)]
1573 out_osds = [int(i[4:].split()[0])
1574 for i in filter(lambda x: " out " in x, osd_lines)]
1575 up_osds = [int(i[4:].split()[0])
1576 for i in filter(lambda x: " up " in x, osd_lines)]
1577 down_osds = [int(i[4:].split()[0])
1578 for i in filter(lambda x: " down " in x, osd_lines)]
1579 dead_osds = [int(x.id_)
1580 for x in filter(lambda x:
1581 not x.running(),
1582 self.ctx.daemons.
1583 iter_daemons_of_role('osd', self.cluster))]
1584 live_osds = [int(x.id_) for x in
1585 filter(lambda x:
1586 x.running(),
1587 self.ctx.daemons.iter_daemons_of_role('osd',
1588 self.cluster))]
1589 return {'in': in_osds, 'out': out_osds, 'up': up_osds,
1590 'down': down_osds, 'dead': dead_osds, 'live': live_osds,
1591 'raw': osd_lines}
1592
1593 def get_num_pgs(self):
1594 """
1595 Check cluster status for the number of pgs
1596 """
1597 status = self.raw_cluster_status()
1598 self.log(status)
1599 return status['pgmap']['num_pgs']
1600
1601 def create_erasure_code_profile(self, profile_name, profile):
1602 """
1603 Create an erasure code profile name that can be used as a parameter
1604 when creating an erasure coded pool.
1605 """
1606 with self.lock:
1607 args = cmd_erasure_code_profile(profile_name, profile)
1608 self.raw_cluster_cmd(*args)
1609
1610 def create_pool_with_unique_name(self, pg_num=16,
1611 erasure_code_profile_name=None,
1612 min_size=None,
1613 erasure_code_use_overwrites=False):
1614 """
1615 Create a pool named unique_pool_X where X is unique.
1616 """
1617 name = ""
1618 with self.lock:
1619 name = "unique_pool_%s" % (str(self.next_pool_id),)
1620 self.next_pool_id += 1
1621 self.create_pool(
1622 name,
1623 pg_num,
1624 erasure_code_profile_name=erasure_code_profile_name,
1625 min_size=min_size,
1626 erasure_code_use_overwrites=erasure_code_use_overwrites)
1627 return name
1628
1629 @contextlib.contextmanager
1630 def pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
1631 self.create_pool(pool_name, pg_num, erasure_code_profile_name)
1632 yield
1633 self.remove_pool(pool_name)
1634
1635 def create_pool(self, pool_name, pg_num=16,
1636 erasure_code_profile_name=None,
1637 min_size=None,
1638 erasure_code_use_overwrites=False):
1639 """
1640 Create a pool named from the pool_name parameter.
1641 :param pool_name: name of the pool being created.
1642 :param pg_num: initial number of pgs.
1643 :param erasure_code_profile_name: if set and !None create an
1644 erasure coded pool using the profile
1645 :param erasure_code_use_overwrites: if true, allow overwrites
1646 """
1647 with self.lock:
1648 assert isinstance(pool_name, basestring)
1649 assert isinstance(pg_num, int)
1650 assert pool_name not in self.pools
1651 self.log("creating pool_name %s" % (pool_name,))
1652 if erasure_code_profile_name:
1653 self.raw_cluster_cmd('osd', 'pool', 'create',
1654 pool_name, str(pg_num), str(pg_num),
1655 'erasure', erasure_code_profile_name)
1656 else:
1657 self.raw_cluster_cmd('osd', 'pool', 'create',
1658 pool_name, str(pg_num))
1659 if min_size is not None:
1660 self.raw_cluster_cmd(
1661 'osd', 'pool', 'set', pool_name,
1662 'min_size',
1663 str(min_size))
1664 if erasure_code_use_overwrites:
1665 self.raw_cluster_cmd(
1666 'osd', 'pool', 'set', pool_name,
1667 'allow_ec_overwrites',
1668 'true')
1669 self.raw_cluster_cmd(
1670 'osd', 'pool', 'application', 'enable',
1671 pool_name, 'rados', '--yes-i-really-mean-it',
1672 run.Raw('||'), 'true')
1673 self.pools[pool_name] = pg_num
1674 time.sleep(1)
1675
1676 def add_pool_snap(self, pool_name, snap_name):
1677 """
1678 Add pool snapshot
1679 :param pool_name: name of pool to snapshot
1680 :param snap_name: name of snapshot to take
1681 """
1682 self.raw_cluster_cmd('osd', 'pool', 'mksnap',
1683 str(pool_name), str(snap_name))
1684
1685 def remove_pool_snap(self, pool_name, snap_name):
1686 """
1687 Remove pool snapshot
1688 :param pool_name: name of pool to snapshot
1689 :param snap_name: name of snapshot to remove
1690 """
1691 self.raw_cluster_cmd('osd', 'pool', 'rmsnap',
1692 str(pool_name), str(snap_name))
1693
1694 def remove_pool(self, pool_name):
1695 """
1696 Remove the indicated pool
1697 :param pool_name: Pool to be removed
1698 """
1699 with self.lock:
1700 assert isinstance(pool_name, basestring)
1701 assert pool_name in self.pools
1702 self.log("removing pool_name %s" % (pool_name,))
1703 del self.pools[pool_name]
1704 self.do_rados(self.controller,
1705 ['rmpool', pool_name, pool_name,
1706 "--yes-i-really-really-mean-it"])
1707
1708 def get_pool(self):
1709 """
1710 Pick a random pool
1711 """
1712 with self.lock:
1713 return random.choice(self.pools.keys())
1714
1715 def get_pool_pg_num(self, pool_name):
1716 """
1717 Return the number of pgs in the pool specified.
1718 """
1719 with self.lock:
1720 assert isinstance(pool_name, basestring)
1721 if pool_name in self.pools:
1722 return self.pools[pool_name]
1723 return 0
1724
1725 def get_pool_property(self, pool_name, prop):
1726 """
1727 :param pool_name: pool
1728 :param prop: property to be checked.
1729 :returns: property as an int value.
1730 """
1731 with self.lock:
1732 assert isinstance(pool_name, basestring)
1733 assert isinstance(prop, basestring)
1734 output = self.raw_cluster_cmd(
1735 'osd',
1736 'pool',
1737 'get',
1738 pool_name,
1739 prop)
1740 return int(output.split()[1])
1741
1742 def set_pool_property(self, pool_name, prop, val):
1743 """
1744 :param pool_name: pool
1745 :param prop: property to be set.
1746 :param val: value to set.
1747
1748 This routine retries if set operation fails.
1749 """
1750 with self.lock:
1751 assert isinstance(pool_name, basestring)
1752 assert isinstance(prop, basestring)
1753 assert isinstance(val, int)
1754 tries = 0
1755 while True:
1756 r = self.raw_cluster_cmd_result(
1757 'osd',
1758 'pool',
1759 'set',
1760 pool_name,
1761 prop,
1762 str(val))
1763 if r != 11: # EAGAIN
1764 break
1765 tries += 1
1766 if tries > 50:
1767 raise Exception('timed out getting EAGAIN '
1768 'when setting pool property %s %s = %s' %
1769 (pool_name, prop, val))
1770 self.log('got EAGAIN setting pool property, '
1771 'waiting a few seconds...')
1772 time.sleep(2)
1773
1774 def expand_pool(self, pool_name, by, max_pgs):
1775 """
1776 Increase the number of pgs in a pool
1777 """
1778 with self.lock:
1779 assert isinstance(pool_name, basestring)
1780 assert isinstance(by, int)
1781 assert pool_name in self.pools
1782 if self.get_num_creating() > 0:
1783 return False
1784 if (self.pools[pool_name] + by) > max_pgs:
1785 return False
1786 self.log("increase pool size by %d" % (by,))
1787 new_pg_num = self.pools[pool_name] + by
1788 self.set_pool_property(pool_name, "pg_num", new_pg_num)
1789 self.pools[pool_name] = new_pg_num
1790 return True
1791
1792 def set_pool_pgpnum(self, pool_name, force):
1793 """
1794 Set pgpnum property of pool_name pool.
1795 """
1796 with self.lock:
1797 assert isinstance(pool_name, basestring)
1798 assert pool_name in self.pools
1799 if not force and self.get_num_creating() > 0:
1800 return False
1801 self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
1802 return True
1803
1804 def list_pg_missing(self, pgid):
1805 """
1806 return list of missing pgs with the id specified
1807 """
1808 r = None
1809 offset = {}
1810 while True:
1811 out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_missing',
1812 json.dumps(offset))
1813 j = json.loads(out)
1814 if r is None:
1815 r = j
1816 else:
1817 r['objects'].extend(j['objects'])
1818 if not 'more' in j:
1819 break
1820 if j['more'] == 0:
1821 break
1822 offset = j['objects'][-1]['oid']
1823 if 'more' in r:
1824 del r['more']
1825 return r
1826
1827 def get_pg_stats(self):
1828 """
1829 Dump the cluster and get pg stats
1830 """
1831 out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
1832 j = json.loads('\n'.join(out.split('\n')[1:]))
1833 return j['pg_stats']
1834
1835 def get_pgids_to_force(self, backfill):
1836 """
1837 Return the randomized list of PGs that can have their recovery/backfill forced
1838 """
1839 j = self.get_pg_stats();
1840 pgids = []
1841 if backfill:
1842 wanted = ['degraded', 'backfilling', 'backfill_wait']
1843 else:
1844 wanted = ['recovering', 'degraded', 'recovery_wait']
1845 for pg in j:
1846 status = pg['state'].split('+')
1847 for t in wanted:
1848 if random.random() > 0.5 and not ('forced_backfill' in status or 'forced_recovery' in status) and t in status:
1849 pgids.append(pg['pgid'])
1850 break
1851 return pgids
1852
1853 def get_pgids_to_cancel_force(self, backfill):
1854 """
1855 Return the randomized list of PGs whose recovery/backfill priority is forced
1856 """
1857 j = self.get_pg_stats();
1858 pgids = []
1859 if backfill:
1860 wanted = 'forced_backfill'
1861 else:
1862 wanted = 'forced_recovery'
1863 for pg in j:
1864 status = pg['state'].split('+')
1865 if wanted in status and random.random() > 0.5:
1866 pgids.append(pg['pgid'])
1867 return pgids
1868
1869 def compile_pg_status(self):
1870 """
1871 Return a histogram of pg state values
1872 """
1873 ret = {}
1874 j = self.get_pg_stats()
1875 for pg in j:
1876 for status in pg['state'].split('+'):
1877 if status not in ret:
1878 ret[status] = 0
1879 ret[status] += 1
1880 return ret
1881
1882 @wait_for_pg_stats
1883 def with_pg_state(self, pool, pgnum, check):
1884 pgstr = self.get_pgid(pool, pgnum)
1885 stats = self.get_single_pg_stats(pgstr)
1886 assert(check(stats['state']))
1887
1888 @wait_for_pg_stats
1889 def with_pg(self, pool, pgnum, check):
1890 pgstr = self.get_pgid(pool, pgnum)
1891 stats = self.get_single_pg_stats(pgstr)
1892 return check(stats)
1893
1894 def get_last_scrub_stamp(self, pool, pgnum):
1895 """
1896 Get the timestamp of the last scrub.
1897 """
1898 stats = self.get_single_pg_stats(self.get_pgid(pool, pgnum))
1899 return stats["last_scrub_stamp"]
1900
1901 def do_pg_scrub(self, pool, pgnum, stype):
1902 """
1903 Scrub pg and wait for scrubbing to finish
1904 """
1905 init = self.get_last_scrub_stamp(pool, pgnum)
1906 RESEND_TIMEOUT = 120 # Must be a multiple of SLEEP_TIME
1907 FATAL_TIMEOUT = RESEND_TIMEOUT * 3
1908 SLEEP_TIME = 10
1909 timer = 0
1910 while init == self.get_last_scrub_stamp(pool, pgnum):
1911 assert timer < FATAL_TIMEOUT, "fatal timeout trying to " + stype
1912 self.log("waiting for scrub type %s" % (stype,))
1913 if (timer % RESEND_TIMEOUT) == 0:
1914 self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
1915 # The first time in this loop is the actual request
1916 if timer != 0 and stype == "repair":
1917 self.log("WARNING: Resubmitted a non-idempotent repair")
1918 time.sleep(SLEEP_TIME)
1919 timer += SLEEP_TIME
1920
1921 def wait_snap_trimming_complete(self, pool):
1922 """
1923 Wait for snap trimming on pool to end
1924 """
1925 POLL_PERIOD = 10
1926 FATAL_TIMEOUT = 600
1927 start = time.time()
1928 poolnum = self.get_pool_num(pool)
1929 poolnumstr = "%s." % (poolnum,)
1930 while (True):
1931 now = time.time()
1932 if (now - start) > FATAL_TIMEOUT:
1933 assert (now - start) < FATAL_TIMEOUT, \
1934 'failed to complete snap trimming before timeout'
1935 all_stats = self.get_pg_stats()
1936 trimming = False
1937 for pg in all_stats:
1938 if (poolnumstr in pg['pgid']) and ('snaptrim' in pg['state']):
1939 self.log("pg {pg} in trimming, state: {state}".format(
1940 pg=pg['pgid'],
1941 state=pg['state']))
1942 trimming = True
1943 if not trimming:
1944 break
1945 self.log("{pool} still trimming, waiting".format(pool=pool))
1946 time.sleep(POLL_PERIOD)
1947
1948 def get_single_pg_stats(self, pgid):
1949 """
1950 Return pg for the pgid specified.
1951 """
1952 all_stats = self.get_pg_stats()
1953
1954 for pg in all_stats:
1955 if pg['pgid'] == pgid:
1956 return pg
1957
1958 return None
1959
1960 def get_object_pg_with_shard(self, pool, name, osdid):
1961 """
1962 """
1963 pool_dump = self.get_pool_dump(pool)
1964 object_map = self.get_object_map(pool, name)
1965 if pool_dump["type"] == CephManager.ERASURE_CODED_POOL:
1966 shard = object_map['acting'].index(osdid)
1967 return "{pgid}s{shard}".format(pgid=object_map['pgid'],
1968 shard=shard)
1969 else:
1970 return object_map['pgid']
1971
1972 def get_object_primary(self, pool, name):
1973 """
1974 """
1975 object_map = self.get_object_map(pool, name)
1976 return object_map['acting_primary']
1977
1978 def get_object_map(self, pool, name):
1979 """
1980 osd map --format=json converted to a python object
1981 :returns: the python object
1982 """
1983 out = self.raw_cluster_cmd('--format=json', 'osd', 'map', pool, name)
1984 return json.loads('\n'.join(out.split('\n')[1:]))
1985
1986 def get_osd_dump_json(self):
1987 """
1988 osd dump --format=json converted to a python object
1989 :returns: the python object
1990 """
1991 out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
1992 return json.loads('\n'.join(out.split('\n')[1:]))
1993
1994 def get_osd_dump(self):
1995 """
1996 Dump osds
1997 :returns: all osds
1998 """
1999 return self.get_osd_dump_json()['osds']
2000
2001 def get_mgr_dump(self):
2002 out = self.raw_cluster_cmd('mgr', 'dump', '--format=json')
2003 return json.loads(out)
2004
2005 def get_stuck_pgs(self, type_, threshold):
2006 """
2007 :returns: stuck pg information from the cluster
2008 """
2009 out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
2010 '--format=json')
2011 return json.loads(out)
2012
2013 def get_num_unfound_objects(self):
2014 """
2015 Check cluster status to get the number of unfound objects
2016 """
2017 status = self.raw_cluster_status()
2018 self.log(status)
2019 return status['pgmap'].get('unfound_objects', 0)
2020
2021 def get_num_creating(self):
2022 """
2023 Find the number of pgs in creating mode.
2024 """
2025 pgs = self.get_pg_stats()
2026 num = 0
2027 for pg in pgs:
2028 if 'creating' in pg['state']:
2029 num += 1
2030 return num
2031
2032 def get_num_active_clean(self):
2033 """
2034 Find the number of active and clean pgs.
2035 """
2036 pgs = self.get_pg_stats()
2037 num = 0
2038 for pg in pgs:
2039 if (pg['state'].count('active') and
2040 pg['state'].count('clean') and
2041 not pg['state'].count('stale')):
2042 num += 1
2043 return num
2044
2045 def get_num_active_recovered(self):
2046 """
2047 Find the number of active and recovered pgs.
2048 """
2049 pgs = self.get_pg_stats()
2050 num = 0
2051 for pg in pgs:
2052 if (pg['state'].count('active') and
2053 not pg['state'].count('recover') and
2054 not pg['state'].count('backfilling') and
2055 not pg['state'].count('stale')):
2056 num += 1
2057 return num
2058
2059 def get_is_making_recovery_progress(self):
2060 """
2061 Return whether there is recovery progress discernable in the
2062 raw cluster status
2063 """
2064 status = self.raw_cluster_status()
2065 kps = status['pgmap'].get('recovering_keys_per_sec', 0)
2066 bps = status['pgmap'].get('recovering_bytes_per_sec', 0)
2067 ops = status['pgmap'].get('recovering_objects_per_sec', 0)
2068 return kps > 0 or bps > 0 or ops > 0
2069
2070 def get_num_active(self):
2071 """
2072 Find the number of active pgs.
2073 """
2074 pgs = self.get_pg_stats()
2075 num = 0
2076 for pg in pgs:
2077 if pg['state'].count('active') and not pg['state'].count('stale'):
2078 num += 1
2079 return num
2080
2081 def get_num_down(self):
2082 """
2083 Find the number of pgs that are down.
2084 """
2085 pgs = self.get_pg_stats()
2086 num = 0
2087 for pg in pgs:
2088 if ((pg['state'].count('down') and not
2089 pg['state'].count('stale')) or
2090 (pg['state'].count('incomplete') and not
2091 pg['state'].count('stale'))):
2092 num += 1
2093 return num
2094
2095 def get_num_active_down(self):
2096 """
2097 Find the number of pgs that are either active or down.
2098 """
2099 pgs = self.get_pg_stats()
2100 num = 0
2101 for pg in pgs:
2102 if ((pg['state'].count('active') and not
2103 pg['state'].count('stale')) or
2104 (pg['state'].count('down') and not
2105 pg['state'].count('stale')) or
2106 (pg['state'].count('incomplete') and not
2107 pg['state'].count('stale'))):
2108 num += 1
2109 return num
2110
2111 def is_clean(self):
2112 """
2113 True if all pgs are clean
2114 """
2115 return self.get_num_active_clean() == self.get_num_pgs()
2116
2117 def is_recovered(self):
2118 """
2119 True if all pgs have recovered
2120 """
2121 return self.get_num_active_recovered() == self.get_num_pgs()
2122
2123 def is_active_or_down(self):
2124 """
2125 True if all pgs are active or down
2126 """
2127 return self.get_num_active_down() == self.get_num_pgs()
2128
2129 def wait_for_clean(self, timeout=None):
2130 """
2131 Returns true when all pgs are clean.
2132 """
2133 self.log("waiting for clean")
2134 start = time.time()
2135 num_active_clean = self.get_num_active_clean()
2136 while not self.is_clean():
2137 if timeout is not None:
2138 if self.get_is_making_recovery_progress():
2139 self.log("making progress, resetting timeout")
2140 start = time.time()
2141 else:
2142 self.log("no progress seen, keeping timeout for now")
2143 if time.time() - start >= timeout:
2144 self.log('dumping pgs')
2145 out = self.raw_cluster_cmd('pg', 'dump')
2146 self.log(out)
2147 assert time.time() - start < timeout, \
2148 'failed to become clean before timeout expired'
2149 cur_active_clean = self.get_num_active_clean()
2150 if cur_active_clean != num_active_clean:
2151 start = time.time()
2152 num_active_clean = cur_active_clean
2153 time.sleep(3)
2154 self.log("clean!")
2155
2156 def are_all_osds_up(self):
2157 """
2158 Returns true if all osds are up.
2159 """
2160 x = self.get_osd_dump()
2161 return (len(x) == sum([(y['up'] > 0) for y in x]))
2162
2163 def wait_for_all_osds_up(self, timeout=None):
2164 """
2165 When this exits, either the timeout has expired, or all
2166 osds are up.
2167 """
2168 self.log("waiting for all up")
2169 start = time.time()
2170 while not self.are_all_osds_up():
2171 if timeout is not None:
2172 assert time.time() - start < timeout, \
2173 'timeout expired in wait_for_all_osds_up'
2174 time.sleep(3)
2175 self.log("all up!")
2176
2177 def pool_exists(self, pool):
2178 if pool in self.list_pools():
2179 return True
2180 return False
2181
2182 def wait_for_pool(self, pool, timeout=300):
2183 """
2184 Wait for a pool to exist
2185 """
2186 self.log('waiting for pool %s to exist' % pool)
2187 start = time.time()
2188 while not self.pool_exists(pool):
2189 if timeout is not None:
2190 assert time.time() - start < timeout, \
2191 'timeout expired in wait_for_pool'
2192 time.sleep(3)
2193
2194 def wait_for_pools(self, pools):
2195 for pool in pools:
2196 self.wait_for_pool(pool)
2197
2198 def is_mgr_available(self):
2199 x = self.get_mgr_dump()
2200 return x.get('available', False)
2201
2202 def wait_for_mgr_available(self, timeout=None):
2203 self.log("waiting for mgr available")
2204 start = time.time()
2205 while not self.is_mgr_available():
2206 if timeout is not None:
2207 assert time.time() - start < timeout, \
2208 'timeout expired in wait_for_mgr_available'
2209 time.sleep(3)
2210 self.log("mgr available!")
2211
2212 def wait_for_recovery(self, timeout=None):
2213 """
2214 Check peering. When this exists, we have recovered.
2215 """
2216 self.log("waiting for recovery to complete")
2217 start = time.time()
2218 num_active_recovered = self.get_num_active_recovered()
2219 while not self.is_recovered():
2220 now = time.time()
2221 if timeout is not None:
2222 if self.get_is_making_recovery_progress():
2223 self.log("making progress, resetting timeout")
2224 start = time.time()
2225 else:
2226 self.log("no progress seen, keeping timeout for now")
2227 if now - start >= timeout:
2228 if self.is_recovered():
2229 break
2230 self.log('dumping pgs')
2231 out = self.raw_cluster_cmd('pg', 'dump')
2232 self.log(out)
2233 assert now - start < timeout, \
2234 'failed to recover before timeout expired'
2235 cur_active_recovered = self.get_num_active_recovered()
2236 if cur_active_recovered != num_active_recovered:
2237 start = time.time()
2238 num_active_recovered = cur_active_recovered
2239 time.sleep(3)
2240 self.log("recovered!")
2241
2242 def wait_for_active(self, timeout=None):
2243 """
2244 Check peering. When this exists, we are definitely active
2245 """
2246 self.log("waiting for peering to complete")
2247 start = time.time()
2248 num_active = self.get_num_active()
2249 while not self.is_active():
2250 if timeout is not None:
2251 if time.time() - start >= timeout:
2252 self.log('dumping pgs')
2253 out = self.raw_cluster_cmd('pg', 'dump')
2254 self.log(out)
2255 assert time.time() - start < timeout, \
2256 'failed to recover before timeout expired'
2257 cur_active = self.get_num_active()
2258 if cur_active != num_active:
2259 start = time.time()
2260 num_active = cur_active
2261 time.sleep(3)
2262 self.log("active!")
2263
2264 def wait_for_active_or_down(self, timeout=None):
2265 """
2266 Check peering. When this exists, we are definitely either
2267 active or down
2268 """
2269 self.log("waiting for peering to complete or become blocked")
2270 start = time.time()
2271 num_active_down = self.get_num_active_down()
2272 while not self.is_active_or_down():
2273 if timeout is not None:
2274 if time.time() - start >= timeout:
2275 self.log('dumping pgs')
2276 out = self.raw_cluster_cmd('pg', 'dump')
2277 self.log(out)
2278 assert time.time() - start < timeout, \
2279 'failed to recover before timeout expired'
2280 cur_active_down = self.get_num_active_down()
2281 if cur_active_down != num_active_down:
2282 start = time.time()
2283 num_active_down = cur_active_down
2284 time.sleep(3)
2285 self.log("active or down!")
2286
2287 def osd_is_up(self, osd):
2288 """
2289 Wrapper for osd check
2290 """
2291 osds = self.get_osd_dump()
2292 return osds[osd]['up'] > 0
2293
2294 def wait_till_osd_is_up(self, osd, timeout=None):
2295 """
2296 Loop waiting for osd.
2297 """
2298 self.log('waiting for osd.%d to be up' % osd)
2299 start = time.time()
2300 while not self.osd_is_up(osd):
2301 if timeout is not None:
2302 assert time.time() - start < timeout, \
2303 'osd.%d failed to come up before timeout expired' % osd
2304 time.sleep(3)
2305 self.log('osd.%d is up' % osd)
2306
2307 def is_active(self):
2308 """
2309 Wrapper to check if all pgs are active
2310 """
2311 return self.get_num_active() == self.get_num_pgs()
2312
2313 def wait_till_active(self, timeout=None):
2314 """
2315 Wait until all pgs are active.
2316 """
2317 self.log("waiting till active")
2318 start = time.time()
2319 while not self.is_active():
2320 if timeout is not None:
2321 if time.time() - start >= timeout:
2322 self.log('dumping pgs')
2323 out = self.raw_cluster_cmd('pg', 'dump')
2324 self.log(out)
2325 assert time.time() - start < timeout, \
2326 'failed to become active before timeout expired'
2327 time.sleep(3)
2328 self.log("active!")
2329
2330 def wait_till_pg_convergence(self, timeout=None):
2331 start = time.time()
2332 old_stats = None
2333 active_osds = [osd['osd'] for osd in self.get_osd_dump()
2334 if osd['in'] and osd['up']]
2335 while True:
2336 # strictly speaking, no need to wait for mon. but due to the
2337 # "ms inject socket failures" setting, the osdmap could be delayed,
2338 # so mgr is likely to ignore the pg-stat messages with pgs serving
2339 # newly created pools which is not yet known by mgr. so, to make sure
2340 # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
2341 # necessary.
2342 self.flush_pg_stats(active_osds)
2343 new_stats = dict((stat['pgid'], stat['state'])
2344 for stat in self.get_pg_stats())
2345 if old_stats == new_stats:
2346 return old_stats
2347 if timeout is not None:
2348 assert time.time() - start < timeout, \
2349 'failed to reach convergence before %d secs' % timeout
2350 old_stats = new_stats
2351 # longer than mgr_stats_period
2352 time.sleep(5 + 1)
2353
2354 def mark_out_osd(self, osd):
2355 """
2356 Wrapper to mark osd out.
2357 """
2358 self.raw_cluster_cmd('osd', 'out', str(osd))
2359
2360 def kill_osd(self, osd):
2361 """
2362 Kill osds by either power cycling (if indicated by the config)
2363 or by stopping.
2364 """
2365 if self.config.get('powercycle'):
2366 remote = self.find_remote('osd', osd)
2367 self.log('kill_osd on osd.{o} '
2368 'doing powercycle of {s}'.format(o=osd, s=remote.name))
2369 self._assert_ipmi(remote)
2370 remote.console.power_off()
2371 elif self.config.get('bdev_inject_crash') and self.config.get('bdev_inject_crash_probability'):
2372 if random.uniform(0, 1) < self.config.get('bdev_inject_crash_probability', .5):
2373 self.raw_cluster_cmd(
2374 '--', 'tell', 'osd.%d' % osd,
2375 'injectargs',
2376 '--bdev-inject-crash %d' % self.config.get('bdev_inject_crash'),
2377 )
2378 try:
2379 self.ctx.daemons.get_daemon('osd', osd, self.cluster).wait()
2380 except:
2381 pass
2382 else:
2383 raise RuntimeError('osd.%s did not fail' % osd)
2384 else:
2385 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2386 else:
2387 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2388
2389 @staticmethod
2390 def _assert_ipmi(remote):
2391 assert remote.console.has_ipmi_credentials, (
2392 "powercycling requested but RemoteConsole is not "
2393 "initialized. Check ipmi config.")
2394
2395 def blackhole_kill_osd(self, osd):
2396 """
2397 Stop osd if nothing else works.
2398 """
2399 self.raw_cluster_cmd('--', 'tell', 'osd.%d' % osd,
2400 'injectargs',
2401 '--objectstore-blackhole')
2402 time.sleep(2)
2403 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2404
2405 def revive_osd(self, osd, timeout=360, skip_admin_check=False):
2406 """
2407 Revive osds by either power cycling (if indicated by the config)
2408 or by restarting.
2409 """
2410 if self.config.get('powercycle'):
2411 remote = self.find_remote('osd', osd)
2412 self.log('kill_osd on osd.{o} doing powercycle of {s}'.
2413 format(o=osd, s=remote.name))
2414 self._assert_ipmi(remote)
2415 remote.console.power_on()
2416 if not remote.console.check_status(300):
2417 raise Exception('Failed to revive osd.{o} via ipmi'.
2418 format(o=osd))
2419 teuthology.reconnect(self.ctx, 60, [remote])
2420 mount_osd_data(self.ctx, remote, self.cluster, str(osd))
2421 self.make_admin_daemon_dir(remote)
2422 self.ctx.daemons.get_daemon('osd', osd, self.cluster).reset()
2423 self.ctx.daemons.get_daemon('osd', osd, self.cluster).restart()
2424
2425 if not skip_admin_check:
2426 # wait for dump_ops_in_flight; this command doesn't appear
2427 # until after the signal handler is installed and it is safe
2428 # to stop the osd again without making valgrind leak checks
2429 # unhappy. see #5924.
2430 self.wait_run_admin_socket('osd', osd,
2431 args=['dump_ops_in_flight'],
2432 timeout=timeout, stdout=DEVNULL)
2433
2434 def mark_down_osd(self, osd):
2435 """
2436 Cluster command wrapper
2437 """
2438 self.raw_cluster_cmd('osd', 'down', str(osd))
2439
2440 def mark_in_osd(self, osd):
2441 """
2442 Cluster command wrapper
2443 """
2444 self.raw_cluster_cmd('osd', 'in', str(osd))
2445
2446 def signal_osd(self, osd, sig, silent=False):
2447 """
2448 Wrapper to local get_daemon call which sends the given
2449 signal to the given osd.
2450 """
2451 self.ctx.daemons.get_daemon('osd', osd,
2452 self.cluster).signal(sig, silent=silent)
2453
2454 ## monitors
2455 def signal_mon(self, mon, sig, silent=False):
2456 """
2457 Wrapper to local get_deamon call
2458 """
2459 self.ctx.daemons.get_daemon('mon', mon,
2460 self.cluster).signal(sig, silent=silent)
2461
2462 def kill_mon(self, mon):
2463 """
2464 Kill the monitor by either power cycling (if the config says so),
2465 or by doing a stop.
2466 """
2467 if self.config.get('powercycle'):
2468 remote = self.find_remote('mon', mon)
2469 self.log('kill_mon on mon.{m} doing powercycle of {s}'.
2470 format(m=mon, s=remote.name))
2471 self._assert_ipmi(remote)
2472 remote.console.power_off()
2473 else:
2474 self.ctx.daemons.get_daemon('mon', mon, self.cluster).stop()
2475
2476 def revive_mon(self, mon):
2477 """
2478 Restart by either power cycling (if the config says so),
2479 or by doing a normal restart.
2480 """
2481 if self.config.get('powercycle'):
2482 remote = self.find_remote('mon', mon)
2483 self.log('revive_mon on mon.{m} doing powercycle of {s}'.
2484 format(m=mon, s=remote.name))
2485 self._assert_ipmi(remote)
2486 remote.console.power_on()
2487 self.make_admin_daemon_dir(remote)
2488 self.ctx.daemons.get_daemon('mon', mon, self.cluster).restart()
2489
2490 def revive_mgr(self, mgr):
2491 """
2492 Restart by either power cycling (if the config says so),
2493 or by doing a normal restart.
2494 """
2495 if self.config.get('powercycle'):
2496 remote = self.find_remote('mgr', mgr)
2497 self.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
2498 format(m=mgr, s=remote.name))
2499 self._assert_ipmi(remote)
2500 remote.console.power_on()
2501 self.make_admin_daemon_dir(remote)
2502 self.ctx.daemons.get_daemon('mgr', mgr, self.cluster).restart()
2503
2504 def get_mon_status(self, mon):
2505 """
2506 Extract all the monitor status information from the cluster
2507 """
2508 addr = self.ctx.ceph[self.cluster].conf['mon.%s' % mon]['mon addr']
2509 out = self.raw_cluster_cmd('-m', addr, 'mon_status')
2510 return json.loads(out)
2511
2512 def get_mon_quorum(self):
2513 """
2514 Extract monitor quorum information from the cluster
2515 """
2516 out = self.raw_cluster_cmd('quorum_status')
2517 j = json.loads(out)
2518 self.log('quorum_status is %s' % out)
2519 return j['quorum']
2520
2521 def wait_for_mon_quorum_size(self, size, timeout=300):
2522 """
2523 Loop until quorum size is reached.
2524 """
2525 self.log('waiting for quorum size %d' % size)
2526 start = time.time()
2527 while not len(self.get_mon_quorum()) == size:
2528 if timeout is not None:
2529 assert time.time() - start < timeout, \
2530 ('failed to reach quorum size %d '
2531 'before timeout expired' % size)
2532 time.sleep(3)
2533 self.log("quorum is size %d" % size)
2534
2535 def get_mon_health(self, debug=False):
2536 """
2537 Extract all the monitor health information.
2538 """
2539 out = self.raw_cluster_cmd('health', '--format=json')
2540 if debug:
2541 self.log('health:\n{h}'.format(h=out))
2542 return json.loads(out)
2543
2544 def get_mds_status(self, mds):
2545 """
2546 Run cluster commands for the mds in order to get mds information
2547 """
2548 out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
2549 j = json.loads(' '.join(out.splitlines()[1:]))
2550 # collate; for dup ids, larger gid wins.
2551 for info in j['info'].itervalues():
2552 if info['name'] == mds:
2553 return info
2554 return None
2555
2556 def get_filepath(self):
2557 """
2558 Return path to osd data with {id} needing to be replaced
2559 """
2560 return '/var/lib/ceph/osd/' + self.cluster + '-{id}'
2561
2562 def make_admin_daemon_dir(self, remote):
2563 """
2564 Create /var/run/ceph directory on remote site.
2565
2566 :param ctx: Context
2567 :param remote: Remote site
2568 """
2569 remote.run(args=['sudo',
2570 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
2571
2572
2573 def utility_task(name):
2574 """
2575 Generate ceph_manager subtask corresponding to ceph_manager
2576 method name
2577 """
2578 def task(ctx, config):
2579 if config is None:
2580 config = {}
2581 args = config.get('args', [])
2582 kwargs = config.get('kwargs', {})
2583 cluster = config.get('cluster', 'ceph')
2584 fn = getattr(ctx.managers[cluster], name)
2585 fn(*args, **kwargs)
2586 return task
2587
2588 revive_osd = utility_task("revive_osd")
2589 revive_mon = utility_task("revive_mon")
2590 kill_osd = utility_task("kill_osd")
2591 kill_mon = utility_task("kill_mon")
2592 create_pool = utility_task("create_pool")
2593 remove_pool = utility_task("remove_pool")
2594 wait_for_clean = utility_task("wait_for_clean")
2595 flush_all_pg_stats = utility_task("flush_all_pg_stats")
2596 set_pool_property = utility_task("set_pool_property")
2597 do_pg_scrub = utility_task("do_pg_scrub")
2598 wait_for_pool = utility_task("wait_for_pool")
2599 wait_for_pools = utility_task("wait_for_pools")