]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/ceph_manager.py
b2f687e49bb2a89030d30a8286248f871402e3a3
[ceph.git] / ceph / qa / tasks / ceph_manager.py
1 """
2 ceph manager -- Thrasher and CephManager objects
3 """
4 from cStringIO import StringIO
5 from functools import wraps
6 import contextlib
7 import random
8 import signal
9 import time
10 import gevent
11 import base64
12 import json
13 import logging
14 import threading
15 import traceback
16 import os
17 from teuthology import misc as teuthology
18 from tasks.scrub import Scrubber
19 from util.rados import cmd_erasure_code_profile
20 from util import get_remote
21 from teuthology.contextutil import safe_while
22 from teuthology.orchestra.remote import Remote
23 from teuthology.orchestra import run
24 from teuthology.exceptions import CommandFailedError
25
26 try:
27 from subprocess import DEVNULL # py3k
28 except ImportError:
29 DEVNULL = open(os.devnull, 'r+')
30
31 DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
32
33 log = logging.getLogger(__name__)
34
35
36 def write_conf(ctx, conf_path=DEFAULT_CONF_PATH, cluster='ceph'):
37 conf_fp = StringIO()
38 ctx.ceph[cluster].conf.write(conf_fp)
39 conf_fp.seek(0)
40 writes = ctx.cluster.run(
41 args=[
42 'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
43 'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
44 'sudo', 'python',
45 '-c',
46 ('import shutil, sys; '
47 'shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))'),
48 conf_path,
49 run.Raw('&&'),
50 'sudo', 'chmod', '0644', conf_path,
51 ],
52 stdin=run.PIPE,
53 wait=False)
54 teuthology.feed_many_stdins_and_close(conf_fp, writes)
55 run.wait(writes)
56
57
58 def mount_osd_data(ctx, remote, cluster, osd):
59 """
60 Mount a remote OSD
61
62 :param ctx: Context
63 :param remote: Remote site
64 :param cluster: name of ceph cluster
65 :param osd: Osd name
66 """
67 log.debug('Mounting data for osd.{o} on {r}'.format(o=osd, r=remote))
68 role = "{0}.osd.{1}".format(cluster, osd)
69 alt_role = role if cluster != 'ceph' else "osd.{0}".format(osd)
70 if remote in ctx.disk_config.remote_to_roles_to_dev:
71 if alt_role in ctx.disk_config.remote_to_roles_to_dev[remote]:
72 role = alt_role
73 if role not in ctx.disk_config.remote_to_roles_to_dev[remote]:
74 return
75 dev = ctx.disk_config.remote_to_roles_to_dev[remote][role]
76 mount_options = ctx.disk_config.\
77 remote_to_roles_to_dev_mount_options[remote][role]
78 fstype = ctx.disk_config.remote_to_roles_to_dev_fstype[remote][role]
79 mnt = os.path.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster, osd))
80
81 log.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
82 'mountpoint: {p}, type: {t}, options: {v}'.format(
83 o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options,
84 c=cluster))
85
86 remote.run(
87 args=[
88 'sudo',
89 'mount',
90 '-t', fstype,
91 '-o', ','.join(mount_options),
92 dev,
93 mnt,
94 ]
95 )
96
97
98 class Thrasher:
99 """
100 Object used to thrash Ceph
101 """
102 def __init__(self, manager, config, logger=None):
103 self.ceph_manager = manager
104 self.cluster = manager.cluster
105 self.ceph_manager.wait_for_clean()
106 osd_status = self.ceph_manager.get_osd_status()
107 self.in_osds = osd_status['in']
108 self.live_osds = osd_status['live']
109 self.out_osds = osd_status['out']
110 self.dead_osds = osd_status['dead']
111 self.stopping = False
112 self.logger = logger
113 self.config = config
114 self.revive_timeout = self.config.get("revive_timeout", 150)
115 self.pools_to_fix_pgp_num = set()
116 if self.config.get('powercycle'):
117 self.revive_timeout += 120
118 self.clean_wait = self.config.get('clean_wait', 0)
119 self.minin = self.config.get("min_in", 3)
120 self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
121 self.sighup_delay = self.config.get('sighup_delay')
122 self.optrack_toggle_delay = self.config.get('optrack_toggle_delay')
123 self.dump_ops_enable = self.config.get('dump_ops_enable')
124 self.noscrub_toggle_delay = self.config.get('noscrub_toggle_delay')
125 self.chance_thrash_cluster_full = self.config.get('chance_thrash_cluster_full', .05)
126 self.chance_thrash_pg_upmap = self.config.get('chance_thrash_pg_upmap', 1.0)
127 self.chance_thrash_pg_upmap_items = self.config.get('chance_thrash_pg_upmap', 1.0)
128 self.random_eio = self.config.get('random_eio')
129 self.chance_force_recovery = self.config.get('chance_force_recovery', 0.3)
130
131 num_osds = self.in_osds + self.out_osds
132 self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * num_osds
133 if self.logger is not None:
134 self.log = lambda x: self.logger.info(x)
135 else:
136 def tmp(x):
137 """
138 Implement log behavior
139 """
140 print x
141 self.log = tmp
142 if self.config is None:
143 self.config = dict()
144 # prevent monitor from auto-marking things out while thrasher runs
145 # try both old and new tell syntax, in case we are testing old code
146 self.saved_options = []
147 # assuming that the default settings do not vary from one daemon to
148 # another
149 first_mon = teuthology.get_first_mon(manager.ctx, self.config).split('.')
150 opts = [('mon', 'mon_osd_down_out_interval', 0)]
151 for service, opt, new_value in opts:
152 old_value = manager.get_config(first_mon[0],
153 first_mon[1],
154 opt)
155 self.saved_options.append((service, opt, old_value))
156 self._set_config(service, '*', opt, new_value)
157 # initialize ceph_objectstore_tool property - must be done before
158 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
159 if (self.config.get('powercycle') or
160 not self.cmd_exists_on_osds("ceph-objectstore-tool") or
161 self.config.get('disable_objectstore_tool_tests', False)):
162 self.ceph_objectstore_tool = False
163 self.test_rm_past_intervals = False
164 if self.config.get('powercycle'):
165 self.log("Unable to test ceph-objectstore-tool, "
166 "powercycle testing")
167 else:
168 self.log("Unable to test ceph-objectstore-tool, "
169 "not available on all OSD nodes")
170 else:
171 self.ceph_objectstore_tool = \
172 self.config.get('ceph_objectstore_tool', True)
173 self.test_rm_past_intervals = \
174 self.config.get('test_rm_past_intervals', True)
175 # spawn do_thrash
176 self.thread = gevent.spawn(self.do_thrash)
177 if self.sighup_delay:
178 self.sighup_thread = gevent.spawn(self.do_sighup)
179 if self.optrack_toggle_delay:
180 self.optrack_toggle_thread = gevent.spawn(self.do_optrack_toggle)
181 if self.dump_ops_enable == "true":
182 self.dump_ops_thread = gevent.spawn(self.do_dump_ops)
183 if self.noscrub_toggle_delay:
184 self.noscrub_toggle_thread = gevent.spawn(self.do_noscrub_toggle)
185
186 def _set_config(self, service_type, service_id, name, value):
187 opt_arg = '--{name} {value}'.format(name=name, value=value)
188 whom = '.'.join([service_type, service_id])
189 self.ceph_manager.raw_cluster_cmd('--', 'tell', whom,
190 'injectargs', opt_arg)
191
192
193 def cmd_exists_on_osds(self, cmd):
194 allremotes = self.ceph_manager.ctx.cluster.only(\
195 teuthology.is_type('osd', self.cluster)).remotes.keys()
196 allremotes = list(set(allremotes))
197 for remote in allremotes:
198 proc = remote.run(args=['type', cmd], wait=True,
199 check_status=False, stdout=StringIO(),
200 stderr=StringIO())
201 if proc.exitstatus != 0:
202 return False;
203 return True;
204
205 def kill_osd(self, osd=None, mark_down=False, mark_out=False):
206 """
207 :param osd: Osd to be killed.
208 :mark_down: Mark down if true.
209 :mark_out: Mark out if true.
210 """
211 if osd is None:
212 osd = random.choice(self.live_osds)
213 self.log("Killing osd %s, live_osds are %s" % (str(osd),
214 str(self.live_osds)))
215 self.live_osds.remove(osd)
216 self.dead_osds.append(osd)
217 self.ceph_manager.kill_osd(osd)
218 if mark_down:
219 self.ceph_manager.mark_down_osd(osd)
220 if mark_out and osd in self.in_osds:
221 self.out_osd(osd)
222 if self.ceph_objectstore_tool:
223 self.log("Testing ceph-objectstore-tool on down osd")
224 remote = self.ceph_manager.find_remote('osd', osd)
225 FSPATH = self.ceph_manager.get_filepath()
226 JPATH = os.path.join(FSPATH, "journal")
227 exp_osd = imp_osd = osd
228 exp_remote = imp_remote = remote
229 # If an older osd is available we'll move a pg from there
230 if (len(self.dead_osds) > 1 and
231 random.random() < self.chance_move_pg):
232 exp_osd = random.choice(self.dead_osds[:-1])
233 exp_remote = self.ceph_manager.find_remote('osd', exp_osd)
234 if ('keyvaluestore_backend' in
235 self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
236 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
237 "--data-path {fpath} --journal-path {jpath} "
238 "--type keyvaluestore "
239 "--log-file="
240 "/var/log/ceph/objectstore_tool.\\$pid.log ".
241 format(fpath=FSPATH, jpath=JPATH))
242 else:
243 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
244 "--data-path {fpath} --journal-path {jpath} "
245 "--log-file="
246 "/var/log/ceph/objectstore_tool.\\$pid.log ".
247 format(fpath=FSPATH, jpath=JPATH))
248 cmd = (prefix + "--op list-pgs").format(id=exp_osd)
249
250 # ceph-objectstore-tool might be temporarily absent during an
251 # upgrade - see http://tracker.ceph.com/issues/18014
252 with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
253 while proceed():
254 proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'],
255 wait=True, check_status=False, stdout=StringIO(),
256 stderr=StringIO())
257 if proc.exitstatus == 0:
258 break
259 log.debug("ceph-objectstore-tool binary not present, trying again")
260
261 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
262 # see http://tracker.ceph.com/issues/19556
263 with safe_while(sleep=15, tries=40, action="ceph-objectstore-tool --op list-pgs") as proceed:
264 while proceed():
265 proc = exp_remote.run(args=cmd, wait=True,
266 check_status=False,
267 stdout=StringIO(), stderr=StringIO())
268 if proc.exitstatus == 0:
269 break
270 elif proc.exitstatus == 1 and proc.stderr == "OSD has the store locked":
271 continue
272 else:
273 raise Exception("ceph-objectstore-tool: "
274 "exp list-pgs failure with status {ret}".
275 format(ret=proc.exitstatus))
276
277 pgs = proc.stdout.getvalue().split('\n')[:-1]
278 if len(pgs) == 0:
279 self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
280 return
281 pg = random.choice(pgs)
282 exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
283 exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
284 exp_path = os.path.join(exp_path,
285 "exp.{pg}.{id}".format(
286 pg=pg,
287 id=exp_osd))
288 # export
289 cmd = prefix + "--op export --pgid {pg} --file {file}"
290 cmd = cmd.format(id=exp_osd, pg=pg, file=exp_path)
291 proc = exp_remote.run(args=cmd)
292 if proc.exitstatus:
293 raise Exception("ceph-objectstore-tool: "
294 "export failure with status {ret}".
295 format(ret=proc.exitstatus))
296 # remove
297 cmd = prefix + "--op remove --pgid {pg}"
298 cmd = cmd.format(id=exp_osd, pg=pg)
299 proc = exp_remote.run(args=cmd)
300 if proc.exitstatus:
301 raise Exception("ceph-objectstore-tool: "
302 "remove failure with status {ret}".
303 format(ret=proc.exitstatus))
304 # If there are at least 2 dead osds we might move the pg
305 if exp_osd != imp_osd:
306 # If pg isn't already on this osd, then we will move it there
307 cmd = (prefix + "--op list-pgs").format(id=imp_osd)
308 proc = imp_remote.run(args=cmd, wait=True,
309 check_status=False, stdout=StringIO())
310 if proc.exitstatus:
311 raise Exception("ceph-objectstore-tool: "
312 "imp list-pgs failure with status {ret}".
313 format(ret=proc.exitstatus))
314 pgs = proc.stdout.getvalue().split('\n')[:-1]
315 if pg not in pgs:
316 self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
317 format(pg=pg, fosd=exp_osd, tosd=imp_osd))
318 if imp_remote != exp_remote:
319 # Copy export file to the other machine
320 self.log("Transfer export file from {srem} to {trem}".
321 format(srem=exp_remote, trem=imp_remote))
322 tmpexport = Remote.get_file(exp_remote, exp_path)
323 Remote.put_file(imp_remote, tmpexport, exp_path)
324 os.remove(tmpexport)
325 else:
326 # Can't move the pg after all
327 imp_osd = exp_osd
328 imp_remote = exp_remote
329 # import
330 cmd = (prefix + "--op import --file {file}")
331 cmd = cmd.format(id=imp_osd, file=exp_path)
332 proc = imp_remote.run(args=cmd, wait=True, check_status=False,
333 stderr=StringIO())
334 if proc.exitstatus == 1:
335 bogosity = "The OSD you are using is older than the exported PG"
336 if bogosity in proc.stderr.getvalue():
337 self.log("OSD older than exported PG"
338 "...ignored")
339 elif proc.exitstatus == 10:
340 self.log("Pool went away before processing an import"
341 "...ignored")
342 elif proc.exitstatus == 11:
343 self.log("Attempt to import an incompatible export"
344 "...ignored")
345 elif proc.exitstatus:
346 raise Exception("ceph-objectstore-tool: "
347 "import failure with status {ret}".
348 format(ret=proc.exitstatus))
349 cmd = "rm -f {file}".format(file=exp_path)
350 exp_remote.run(args=cmd)
351 if imp_remote != exp_remote:
352 imp_remote.run(args=cmd)
353
354 # apply low split settings to each pool
355 for pool in self.ceph_manager.list_pools():
356 no_sudo_prefix = prefix[5:]
357 cmd = ("CEPH_ARGS='--filestore-merge-threshold 1 "
358 "--filestore-split-multiple 1' sudo -E "
359 + no_sudo_prefix + "--op apply-layout-settings --pool " + pool).format(id=osd)
360 proc = remote.run(args=cmd, wait=True, check_status=False, stderr=StringIO())
361 output = proc.stderr.getvalue()
362 if 'Couldn\'t find pool' in output:
363 continue
364 if proc.exitstatus:
365 raise Exception("ceph-objectstore-tool apply-layout-settings"
366 " failed with {status}".format(status=proc.exitstatus))
367
368 def rm_past_intervals(self, osd=None):
369 """
370 :param osd: Osd to find pg to remove past intervals
371 """
372 if self.test_rm_past_intervals:
373 if osd is None:
374 osd = random.choice(self.dead_osds)
375 self.log("Use ceph_objectstore_tool to remove past intervals")
376 remote = self.ceph_manager.find_remote('osd', osd)
377 FSPATH = self.ceph_manager.get_filepath()
378 JPATH = os.path.join(FSPATH, "journal")
379 if ('keyvaluestore_backend' in
380 self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
381 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
382 "--data-path {fpath} --journal-path {jpath} "
383 "--type keyvaluestore "
384 "--log-file="
385 "/var/log/ceph/objectstore_tool.\\$pid.log ".
386 format(fpath=FSPATH, jpath=JPATH))
387 else:
388 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
389 "--data-path {fpath} --journal-path {jpath} "
390 "--log-file="
391 "/var/log/ceph/objectstore_tool.\\$pid.log ".
392 format(fpath=FSPATH, jpath=JPATH))
393 cmd = (prefix + "--op list-pgs").format(id=osd)
394 proc = remote.run(args=cmd, wait=True,
395 check_status=False, stdout=StringIO())
396 if proc.exitstatus:
397 raise Exception("ceph_objectstore_tool: "
398 "exp list-pgs failure with status {ret}".
399 format(ret=proc.exitstatus))
400 pgs = proc.stdout.getvalue().split('\n')[:-1]
401 if len(pgs) == 0:
402 self.log("No PGs found for osd.{osd}".format(osd=osd))
403 return
404 pg = random.choice(pgs)
405 cmd = (prefix + "--op rm-past-intervals --pgid {pg}").\
406 format(id=osd, pg=pg)
407 proc = remote.run(args=cmd)
408 if proc.exitstatus:
409 raise Exception("ceph_objectstore_tool: "
410 "rm-past-intervals failure with status {ret}".
411 format(ret=proc.exitstatus))
412
413 def blackhole_kill_osd(self, osd=None):
414 """
415 If all else fails, kill the osd.
416 :param osd: Osd to be killed.
417 """
418 if osd is None:
419 osd = random.choice(self.live_osds)
420 self.log("Blackholing and then killing osd %s, live_osds are %s" %
421 (str(osd), str(self.live_osds)))
422 self.live_osds.remove(osd)
423 self.dead_osds.append(osd)
424 self.ceph_manager.blackhole_kill_osd(osd)
425
426 def revive_osd(self, osd=None, skip_admin_check=False):
427 """
428 Revive the osd.
429 :param osd: Osd to be revived.
430 """
431 if osd is None:
432 osd = random.choice(self.dead_osds)
433 self.log("Reviving osd %s" % (str(osd),))
434 self.ceph_manager.revive_osd(
435 osd,
436 self.revive_timeout,
437 skip_admin_check=skip_admin_check)
438 self.dead_osds.remove(osd)
439 self.live_osds.append(osd)
440 if self.random_eio > 0 and osd is self.rerrosd:
441 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
442 'injectargs', '--', '--filestore_debug_random_read_err='+str(self.random_eio))
443 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
444 'injectargs', '--', '--bluestore_debug_random_read_err='+str(self.random_eio))
445
446
447 def out_osd(self, osd=None):
448 """
449 Mark the osd out
450 :param osd: Osd to be marked.
451 """
452 if osd is None:
453 osd = random.choice(self.in_osds)
454 self.log("Removing osd %s, in_osds are: %s" %
455 (str(osd), str(self.in_osds)))
456 self.ceph_manager.mark_out_osd(osd)
457 self.in_osds.remove(osd)
458 self.out_osds.append(osd)
459
460 def in_osd(self, osd=None):
461 """
462 Mark the osd out
463 :param osd: Osd to be marked.
464 """
465 if osd is None:
466 osd = random.choice(self.out_osds)
467 if osd in self.dead_osds:
468 return self.revive_osd(osd)
469 self.log("Adding osd %s" % (str(osd),))
470 self.out_osds.remove(osd)
471 self.in_osds.append(osd)
472 self.ceph_manager.mark_in_osd(osd)
473 self.log("Added osd %s" % (str(osd),))
474
475 def reweight_osd_or_by_util(self, osd=None):
476 """
477 Reweight an osd that is in
478 :param osd: Osd to be marked.
479 """
480 if osd is not None or random.choice([True, False]):
481 if osd is None:
482 osd = random.choice(self.in_osds)
483 val = random.uniform(.1, 1.0)
484 self.log("Reweighting osd %s to %s" % (str(osd), str(val)))
485 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
486 str(osd), str(val))
487 else:
488 # do it several times, the option space is large
489 for i in range(5):
490 options = {
491 'max_change': random.choice(['0.05', '1.0', '3.0']),
492 'overage': random.choice(['110', '1000']),
493 'type': random.choice([
494 'reweight-by-utilization',
495 'test-reweight-by-utilization']),
496 }
497 self.log("Reweighting by: %s"%(str(options),))
498 self.ceph_manager.raw_cluster_cmd(
499 'osd',
500 options['type'],
501 options['overage'],
502 options['max_change'])
503
504 def primary_affinity(self, osd=None):
505 if osd is None:
506 osd = random.choice(self.in_osds)
507 if random.random() >= .5:
508 pa = random.random()
509 elif random.random() >= .5:
510 pa = 1
511 else:
512 pa = 0
513 self.log('Setting osd %s primary_affinity to %f' % (str(osd), pa))
514 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
515 str(osd), str(pa))
516
517 def thrash_cluster_full(self):
518 """
519 Set and unset cluster full condition
520 """
521 self.log('Setting full ratio to .001')
522 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
523 time.sleep(1)
524 self.log('Setting full ratio back to .95')
525 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
526
527 def thrash_pg_upmap(self):
528 """
529 Install or remove random pg_upmap entries in OSDMap
530 """
531 from random import shuffle
532 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
533 j = json.loads(out)
534 self.log('j is %s' % j)
535 try:
536 if random.random() >= .3:
537 pgs = self.ceph_manager.get_pg_stats()
538 pg = random.choice(pgs)
539 pgid = str(pg['pgid'])
540 poolid = int(pgid.split('.')[0])
541 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
542 if len(sizes) == 0:
543 return
544 n = sizes[0]
545 osds = self.in_osds + self.out_osds
546 shuffle(osds)
547 osds = osds[0:n]
548 self.log('Setting %s to %s' % (pgid, osds))
549 cmd = ['osd', 'pg-upmap', pgid] + [str(x) for x in osds]
550 self.log('cmd %s' % cmd)
551 self.ceph_manager.raw_cluster_cmd(*cmd)
552 else:
553 m = j['pg_upmap']
554 if len(m) > 0:
555 shuffle(m)
556 pg = m[0]['pgid']
557 self.log('Clearing pg_upmap on %s' % pg)
558 self.ceph_manager.raw_cluster_cmd(
559 'osd',
560 'rm-pg-upmap',
561 pg)
562 else:
563 self.log('No pg_upmap entries; doing nothing')
564 except CommandFailedError:
565 self.log('Failed to rm-pg-upmap, ignoring')
566
567 def thrash_pg_upmap_items(self):
568 """
569 Install or remove random pg_upmap_items entries in OSDMap
570 """
571 from random import shuffle
572 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
573 j = json.loads(out)
574 self.log('j is %s' % j)
575 try:
576 if random.random() >= .3:
577 pgs = self.ceph_manager.get_pg_stats()
578 pg = random.choice(pgs)
579 pgid = str(pg['pgid'])
580 poolid = int(pgid.split('.')[0])
581 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
582 if len(sizes) == 0:
583 return
584 n = sizes[0]
585 osds = self.in_osds + self.out_osds
586 shuffle(osds)
587 osds = osds[0:n*2]
588 self.log('Setting %s to %s' % (pgid, osds))
589 cmd = ['osd', 'pg-upmap-items', pgid] + [str(x) for x in osds]
590 self.log('cmd %s' % cmd)
591 self.ceph_manager.raw_cluster_cmd(*cmd)
592 else:
593 m = j['pg_upmap_items']
594 if len(m) > 0:
595 shuffle(m)
596 pg = m[0]['pgid']
597 self.log('Clearing pg_upmap on %s' % pg)
598 self.ceph_manager.raw_cluster_cmd(
599 'osd',
600 'rm-pg-upmap-items',
601 pg)
602 else:
603 self.log('No pg_upmap entries; doing nothing')
604 except CommandFailedError:
605 self.log('Failed to rm-pg-upmap-items, ignoring')
606
607 def force_recovery(self):
608 """
609 Force recovery on some of PGs
610 """
611 backfill = random.random() >= 0.5
612 j = self.ceph_manager.get_pgids_to_force(backfill)
613 if j:
614 if backfill:
615 self.ceph_manager.raw_cluster_cmd('pg', 'force-backfill', *j)
616 else:
617 self.ceph_manager.raw_cluster_cmd('pg', 'force-recovery', *j)
618
619 def cancel_force_recovery(self):
620 """
621 Force recovery on some of PGs
622 """
623 backfill = random.random() >= 0.5
624 j = self.ceph_manager.get_pgids_to_cancel_force(backfill)
625 if j:
626 if backfill:
627 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-backfill', *j)
628 else:
629 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-recovery', *j)
630
631 def force_cancel_recovery(self):
632 """
633 Force or cancel forcing recovery
634 """
635 if random.random() >= 0.4:
636 self.force_recovery()
637 else:
638 self.cancel_force_recovery()
639
640 def all_up(self):
641 """
642 Make sure all osds are up and not out.
643 """
644 while len(self.dead_osds) > 0:
645 self.log("reviving osd")
646 self.revive_osd()
647 while len(self.out_osds) > 0:
648 self.log("inning osd")
649 self.in_osd()
650
651 def all_up_in(self):
652 """
653 Make sure all osds are up and fully in.
654 """
655 self.all_up();
656 for osd in self.live_osds:
657 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
658 str(osd), str(1))
659 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
660 str(osd), str(1))
661
662 def do_join(self):
663 """
664 Break out of this Ceph loop
665 """
666 self.stopping = True
667 self.thread.get()
668 if self.sighup_delay:
669 self.log("joining the do_sighup greenlet")
670 self.sighup_thread.get()
671 if self.optrack_toggle_delay:
672 self.log("joining the do_optrack_toggle greenlet")
673 self.optrack_toggle_thread.join()
674 if self.dump_ops_enable == "true":
675 self.log("joining the do_dump_ops greenlet")
676 self.dump_ops_thread.join()
677 if self.noscrub_toggle_delay:
678 self.log("joining the do_noscrub_toggle greenlet")
679 self.noscrub_toggle_thread.join()
680
681 def grow_pool(self):
682 """
683 Increase the size of the pool
684 """
685 pool = self.ceph_manager.get_pool()
686 orig_pg_num = self.ceph_manager.get_pool_pg_num(pool)
687 self.log("Growing pool %s" % (pool,))
688 if self.ceph_manager.expand_pool(pool,
689 self.config.get('pool_grow_by', 10),
690 self.max_pgs):
691 self.pools_to_fix_pgp_num.add(pool)
692
693 def fix_pgp_num(self, pool=None):
694 """
695 Fix number of pgs in pool.
696 """
697 if pool is None:
698 pool = self.ceph_manager.get_pool()
699 force = False
700 else:
701 force = True
702 self.log("fixing pg num pool %s" % (pool,))
703 if self.ceph_manager.set_pool_pgpnum(pool, force):
704 self.pools_to_fix_pgp_num.discard(pool)
705
706 def test_pool_min_size(self):
707 """
708 Kill and revive all osds except one.
709 """
710 self.log("test_pool_min_size")
711 self.all_up()
712 self.ceph_manager.wait_for_recovery(
713 timeout=self.config.get('timeout')
714 )
715 the_one = random.choice(self.in_osds)
716 self.log("Killing everyone but %s", the_one)
717 to_kill = filter(lambda x: x != the_one, self.in_osds)
718 [self.kill_osd(i) for i in to_kill]
719 [self.out_osd(i) for i in to_kill]
720 time.sleep(self.config.get("test_pool_min_size_time", 10))
721 self.log("Killing %s" % (the_one,))
722 self.kill_osd(the_one)
723 self.out_osd(the_one)
724 self.log("Reviving everyone but %s" % (the_one,))
725 [self.revive_osd(i) for i in to_kill]
726 [self.in_osd(i) for i in to_kill]
727 self.log("Revived everyone but %s" % (the_one,))
728 self.log("Waiting for clean")
729 self.ceph_manager.wait_for_recovery(
730 timeout=self.config.get('timeout')
731 )
732
733 def inject_pause(self, conf_key, duration, check_after, should_be_down):
734 """
735 Pause injection testing. Check for osd being down when finished.
736 """
737 the_one = random.choice(self.live_osds)
738 self.log("inject_pause on {osd}".format(osd=the_one))
739 self.log(
740 "Testing {key} pause injection for duration {duration}".format(
741 key=conf_key,
742 duration=duration
743 ))
744 self.log(
745 "Checking after {after}, should_be_down={shouldbedown}".format(
746 after=check_after,
747 shouldbedown=should_be_down
748 ))
749 self.ceph_manager.set_config(the_one, **{conf_key: duration})
750 if not should_be_down:
751 return
752 time.sleep(check_after)
753 status = self.ceph_manager.get_osd_status()
754 assert the_one in status['down']
755 time.sleep(duration - check_after + 20)
756 status = self.ceph_manager.get_osd_status()
757 assert not the_one in status['down']
758
759 def test_backfill_full(self):
760 """
761 Test backfills stopping when the replica fills up.
762
763 First, use injectfull admin command to simulate a now full
764 osd by setting it to 0 on all of the OSDs.
765
766 Second, on a random subset, set
767 osd_debug_skip_full_check_in_backfill_reservation to force
768 the more complicated check in do_scan to be exercised.
769
770 Then, verify that all backfills stop.
771 """
772 self.log("injecting backfill full")
773 for i in self.live_osds:
774 self.ceph_manager.set_config(
775 i,
776 osd_debug_skip_full_check_in_backfill_reservation=
777 random.choice(['false', 'true']))
778 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'backfillfull'],
779 check_status=True, timeout=30, stdout=DEVNULL)
780 for i in range(30):
781 status = self.ceph_manager.compile_pg_status()
782 if 'backfill' not in status.keys():
783 break
784 self.log(
785 "waiting for {still_going} backfills".format(
786 still_going=status.get('backfill')))
787 time.sleep(1)
788 assert('backfill' not in self.ceph_manager.compile_pg_status().keys())
789 for i in self.live_osds:
790 self.ceph_manager.set_config(
791 i,
792 osd_debug_skip_full_check_in_backfill_reservation='false')
793 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'none'],
794 check_status=True, timeout=30, stdout=DEVNULL)
795
796 def test_map_discontinuity(self):
797 """
798 1) Allows the osds to recover
799 2) kills an osd
800 3) allows the remaining osds to recover
801 4) waits for some time
802 5) revives the osd
803 This sequence should cause the revived osd to have to handle
804 a map gap since the mons would have trimmed
805 """
806 while len(self.in_osds) < (self.minin + 1):
807 self.in_osd()
808 self.log("Waiting for recovery")
809 self.ceph_manager.wait_for_all_osds_up(
810 timeout=self.config.get('timeout')
811 )
812 # now we wait 20s for the pg status to change, if it takes longer,
813 # the test *should* fail!
814 time.sleep(20)
815 self.ceph_manager.wait_for_clean(
816 timeout=self.config.get('timeout')
817 )
818
819 # now we wait 20s for the backfill replicas to hear about the clean
820 time.sleep(20)
821 self.log("Recovered, killing an osd")
822 self.kill_osd(mark_down=True, mark_out=True)
823 self.log("Waiting for clean again")
824 self.ceph_manager.wait_for_clean(
825 timeout=self.config.get('timeout')
826 )
827 self.log("Waiting for trim")
828 time.sleep(int(self.config.get("map_discontinuity_sleep_time", 40)))
829 self.revive_osd()
830
831 def choose_action(self):
832 """
833 Random action selector.
834 """
835 chance_down = self.config.get('chance_down', 0.4)
836 chance_test_min_size = self.config.get('chance_test_min_size', 0)
837 chance_test_backfill_full = \
838 self.config.get('chance_test_backfill_full', 0)
839 if isinstance(chance_down, int):
840 chance_down = float(chance_down) / 100
841 minin = self.minin
842 minout = self.config.get("min_out", 0)
843 minlive = self.config.get("min_live", 2)
844 mindead = self.config.get("min_dead", 0)
845
846 self.log('choose_action: min_in %d min_out '
847 '%d min_live %d min_dead %d' %
848 (minin, minout, minlive, mindead))
849 actions = []
850 if len(self.in_osds) > minin:
851 actions.append((self.out_osd, 1.0,))
852 if len(self.live_osds) > minlive and chance_down > 0:
853 actions.append((self.kill_osd, chance_down,))
854 if len(self.dead_osds) > 1:
855 actions.append((self.rm_past_intervals, 1.0,))
856 if len(self.out_osds) > minout:
857 actions.append((self.in_osd, 1.7,))
858 if len(self.dead_osds) > mindead:
859 actions.append((self.revive_osd, 1.0,))
860 if self.config.get('thrash_primary_affinity', True):
861 actions.append((self.primary_affinity, 1.0,))
862 actions.append((self.reweight_osd_or_by_util,
863 self.config.get('reweight_osd', .5),))
864 actions.append((self.grow_pool,
865 self.config.get('chance_pgnum_grow', 0),))
866 actions.append((self.fix_pgp_num,
867 self.config.get('chance_pgpnum_fix', 0),))
868 actions.append((self.test_pool_min_size,
869 chance_test_min_size,))
870 actions.append((self.test_backfill_full,
871 chance_test_backfill_full,))
872 if self.chance_thrash_cluster_full > 0:
873 actions.append((self.thrash_cluster_full, self.chance_thrash_cluster_full,))
874 if self.chance_thrash_pg_upmap > 0:
875 actions.append((self.thrash_pg_upmap, self.chance_thrash_pg_upmap,))
876 if self.chance_thrash_pg_upmap_items > 0:
877 actions.append((self.thrash_pg_upmap_items, self.chance_thrash_pg_upmap_items,))
878 if self.chance_force_recovery > 0:
879 actions.append((self.force_cancel_recovery, self.chance_force_recovery))
880
881 for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
882 for scenario in [
883 (lambda:
884 self.inject_pause(key,
885 self.config.get('pause_short', 3),
886 0,
887 False),
888 self.config.get('chance_inject_pause_short', 1),),
889 (lambda:
890 self.inject_pause(key,
891 self.config.get('pause_long', 80),
892 self.config.get('pause_check_after', 70),
893 True),
894 self.config.get('chance_inject_pause_long', 0),)]:
895 actions.append(scenario)
896
897 total = sum([y for (x, y) in actions])
898 val = random.uniform(0, total)
899 for (action, prob) in actions:
900 if val < prob:
901 return action
902 val -= prob
903 return None
904
905 def log_exc(func):
906 @wraps(func)
907 def wrapper(self):
908 try:
909 return func(self)
910 except:
911 self.log(traceback.format_exc())
912 raise
913 return wrapper
914
915 @log_exc
916 def do_sighup(self):
917 """
918 Loops and sends signal.SIGHUP to a random live osd.
919
920 Loop delay is controlled by the config value sighup_delay.
921 """
922 delay = float(self.sighup_delay)
923 self.log("starting do_sighup with a delay of {0}".format(delay))
924 while not self.stopping:
925 osd = random.choice(self.live_osds)
926 self.ceph_manager.signal_osd(osd, signal.SIGHUP, silent=True)
927 time.sleep(delay)
928
929 @log_exc
930 def do_optrack_toggle(self):
931 """
932 Loops and toggle op tracking to all osds.
933
934 Loop delay is controlled by the config value optrack_toggle_delay.
935 """
936 delay = float(self.optrack_toggle_delay)
937 osd_state = "true"
938 self.log("starting do_optrack_toggle with a delay of {0}".format(delay))
939 while not self.stopping:
940 if osd_state == "true":
941 osd_state = "false"
942 else:
943 osd_state = "true"
944 self.ceph_manager.raw_cluster_cmd_result('tell', 'osd.*',
945 'injectargs', '--osd_enable_op_tracker=%s' % osd_state)
946 gevent.sleep(delay)
947
948 @log_exc
949 def do_dump_ops(self):
950 """
951 Loops and does op dumps on all osds
952 """
953 self.log("starting do_dump_ops")
954 while not self.stopping:
955 for osd in self.live_osds:
956 # Ignore errors because live_osds is in flux
957 self.ceph_manager.osd_admin_socket(osd, command=['dump_ops_in_flight'],
958 check_status=False, timeout=30, stdout=DEVNULL)
959 self.ceph_manager.osd_admin_socket(osd, command=['dump_blocked_ops'],
960 check_status=False, timeout=30, stdout=DEVNULL)
961 self.ceph_manager.osd_admin_socket(osd, command=['dump_historic_ops'],
962 check_status=False, timeout=30, stdout=DEVNULL)
963 gevent.sleep(0)
964
965 @log_exc
966 def do_noscrub_toggle(self):
967 """
968 Loops and toggle noscrub flags
969
970 Loop delay is controlled by the config value noscrub_toggle_delay.
971 """
972 delay = float(self.noscrub_toggle_delay)
973 scrub_state = "none"
974 self.log("starting do_noscrub_toggle with a delay of {0}".format(delay))
975 while not self.stopping:
976 if scrub_state == "none":
977 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'noscrub')
978 scrub_state = "noscrub"
979 elif scrub_state == "noscrub":
980 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
981 scrub_state = "both"
982 elif scrub_state == "both":
983 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
984 scrub_state = "nodeep-scrub"
985 else:
986 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
987 scrub_state = "none"
988 gevent.sleep(delay)
989 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
990 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
991
992 @log_exc
993 def do_thrash(self):
994 """
995 Loop to select random actions to thrash ceph manager with.
996 """
997 cleanint = self.config.get("clean_interval", 60)
998 scrubint = self.config.get("scrub_interval", -1)
999 maxdead = self.config.get("max_dead", 0)
1000 delay = self.config.get("op_delay", 5)
1001 self.rerrosd = self.live_osds[0]
1002 if self.random_eio > 0:
1003 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1004 'injectargs', '--', '--filestore_debug_random_read_err='+str(self.random_eio))
1005 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1006 'injectargs', '--', '--bluestore_debug_random_read_err='+str(self.random_eio))
1007 self.log("starting do_thrash")
1008 while not self.stopping:
1009 to_log = [str(x) for x in ["in_osds: ", self.in_osds,
1010 "out_osds: ", self.out_osds,
1011 "dead_osds: ", self.dead_osds,
1012 "live_osds: ", self.live_osds]]
1013 self.log(" ".join(to_log))
1014 if random.uniform(0, 1) < (float(delay) / cleanint):
1015 while len(self.dead_osds) > maxdead:
1016 self.revive_osd()
1017 for osd in self.in_osds:
1018 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
1019 str(osd), str(1))
1020 if random.uniform(0, 1) < float(
1021 self.config.get('chance_test_map_discontinuity', 0)):
1022 self.test_map_discontinuity()
1023 else:
1024 self.ceph_manager.wait_for_recovery(
1025 timeout=self.config.get('timeout')
1026 )
1027 time.sleep(self.clean_wait)
1028 if scrubint > 0:
1029 if random.uniform(0, 1) < (float(delay) / scrubint):
1030 self.log('Scrubbing while thrashing being performed')
1031 Scrubber(self.ceph_manager, self.config)
1032 self.choose_action()()
1033 time.sleep(delay)
1034 if self.random_eio > 0:
1035 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1036 'injectargs', '--', '--filestore_debug_random_read_err=0.0')
1037 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1038 'injectargs', '--', '--bluestore_debug_random_read_err=0.0')
1039 for pool in list(self.pools_to_fix_pgp_num):
1040 if self.ceph_manager.get_pool_pg_num(pool) > 0:
1041 self.fix_pgp_num(pool)
1042 self.pools_to_fix_pgp_num.clear()
1043 for service, opt, saved_value in self.saved_options:
1044 self._set_config(service, '*', opt, saved_value)
1045 self.saved_options = []
1046 self.all_up_in()
1047
1048
1049 class ObjectStoreTool:
1050
1051 def __init__(self, manager, pool, **kwargs):
1052 self.manager = manager
1053 self.pool = pool
1054 self.osd = kwargs.get('osd', None)
1055 self.object_name = kwargs.get('object_name', None)
1056 self.do_revive = kwargs.get('do_revive', True)
1057 if self.osd and self.pool and self.object_name:
1058 if self.osd == "primary":
1059 self.osd = self.manager.get_object_primary(self.pool,
1060 self.object_name)
1061 assert self.osd
1062 if self.object_name:
1063 self.pgid = self.manager.get_object_pg_with_shard(self.pool,
1064 self.object_name,
1065 self.osd)
1066 self.remote = self.manager.ctx.\
1067 cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()[0]
1068 path = self.manager.get_filepath().format(id=self.osd)
1069 self.paths = ("--data-path {path} --journal-path {path}/journal".
1070 format(path=path))
1071
1072 def build_cmd(self, options, args, stdin):
1073 lines = []
1074 if self.object_name:
1075 lines.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1076 "{paths} --pgid {pgid} --op list |"
1077 "grep '\"oid\":\"{name}\"')".
1078 format(paths=self.paths,
1079 pgid=self.pgid,
1080 name=self.object_name))
1081 args = '"$object" ' + args
1082 options += " --pgid {pgid}".format(pgid=self.pgid)
1083 cmd = ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1084 format(paths=self.paths,
1085 args=args,
1086 options=options))
1087 if stdin:
1088 cmd = ("echo {payload} | base64 --decode | {cmd}".
1089 format(payload=base64.encode(stdin),
1090 cmd=cmd))
1091 lines.append(cmd)
1092 return "\n".join(lines)
1093
1094 def run(self, options, args, stdin=None, stdout=None):
1095 if stdout is None:
1096 stdout = StringIO()
1097 self.manager.kill_osd(self.osd)
1098 cmd = self.build_cmd(options, args, stdin)
1099 self.manager.log(cmd)
1100 try:
1101 proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
1102 check_status=False,
1103 stdout=stdout,
1104 stderr=StringIO())
1105 proc.wait()
1106 if proc.exitstatus != 0:
1107 self.manager.log("failed with " + str(proc.exitstatus))
1108 error = proc.stdout.getvalue() + " " + proc.stderr.getvalue()
1109 raise Exception(error)
1110 finally:
1111 if self.do_revive:
1112 self.manager.revive_osd(self.osd)
1113 self.manager.wait_till_osd_is_up(self.osd, 300)
1114
1115
1116 class CephManager:
1117 """
1118 Ceph manager object.
1119 Contains several local functions that form a bulk of this module.
1120
1121 Note: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1122 the same name.
1123 """
1124
1125 REPLICATED_POOL = 1
1126 ERASURE_CODED_POOL = 3
1127
1128 def __init__(self, controller, ctx=None, config=None, logger=None,
1129 cluster='ceph'):
1130 self.lock = threading.RLock()
1131 self.ctx = ctx
1132 self.config = config
1133 self.controller = controller
1134 self.next_pool_id = 0
1135 self.cluster = cluster
1136 if (logger):
1137 self.log = lambda x: logger.info(x)
1138 else:
1139 def tmp(x):
1140 """
1141 implement log behavior.
1142 """
1143 print x
1144 self.log = tmp
1145 if self.config is None:
1146 self.config = dict()
1147 pools = self.list_pools()
1148 self.pools = {}
1149 for pool in pools:
1150 # we may race with a pool deletion; ignore failures here
1151 try:
1152 self.pools[pool] = self.get_pool_property(pool, 'pg_num')
1153 except CommandFailedError:
1154 self.log('Failed to get pg_num from pool %s, ignoring' % pool)
1155
1156 def raw_cluster_cmd(self, *args):
1157 """
1158 Start ceph on a raw cluster. Return count
1159 """
1160 testdir = teuthology.get_testdir(self.ctx)
1161 ceph_args = [
1162 'sudo',
1163 'adjust-ulimits',
1164 'ceph-coverage',
1165 '{tdir}/archive/coverage'.format(tdir=testdir),
1166 'timeout',
1167 '120',
1168 'ceph',
1169 '--cluster',
1170 self.cluster,
1171 ]
1172 ceph_args.extend(args)
1173 proc = self.controller.run(
1174 args=ceph_args,
1175 stdout=StringIO(),
1176 )
1177 return proc.stdout.getvalue()
1178
1179 def raw_cluster_cmd_result(self, *args):
1180 """
1181 Start ceph on a cluster. Return success or failure information.
1182 """
1183 testdir = teuthology.get_testdir(self.ctx)
1184 ceph_args = [
1185 'sudo',
1186 'adjust-ulimits',
1187 'ceph-coverage',
1188 '{tdir}/archive/coverage'.format(tdir=testdir),
1189 'timeout',
1190 '120',
1191 'ceph',
1192 '--cluster',
1193 self.cluster,
1194 ]
1195 ceph_args.extend(args)
1196 proc = self.controller.run(
1197 args=ceph_args,
1198 check_status=False,
1199 )
1200 return proc.exitstatus
1201
1202 def run_ceph_w(self):
1203 """
1204 Execute "ceph -w" in the background with stdout connected to a StringIO,
1205 and return the RemoteProcess.
1206 """
1207 return self.controller.run(
1208 args=["sudo",
1209 "daemon-helper",
1210 "kill",
1211 "ceph",
1212 '--cluster',
1213 self.cluster,
1214 "-w"],
1215 wait=False, stdout=StringIO(), stdin=run.PIPE)
1216
1217 def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
1218 """
1219 Flush pg stats from a list of OSD ids, ensuring they are reflected
1220 all the way to the monitor. Luminous and later only.
1221
1222 :param osds: list of OSDs to flush
1223 :param no_wait: list of OSDs not to wait for seq id. by default, we
1224 wait for all specified osds, but some of them could be
1225 moved out of osdmap, so we cannot get their updated
1226 stat seq from monitor anymore. in that case, you need
1227 to pass a blacklist.
1228 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
1229 it. (5 min by default)
1230 """
1231 seq = {osd: self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats')
1232 for osd in osds}
1233 if not wait_for_mon:
1234 return
1235 if no_wait is None:
1236 no_wait = []
1237 for osd, need in seq.iteritems():
1238 if osd in no_wait:
1239 continue
1240 got = 0
1241 while wait_for_mon > 0:
1242 got = self.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd)
1243 self.log('need seq {need} got {got} for osd.{osd}'.format(
1244 need=need, got=got, osd=osd))
1245 if got >= need:
1246 break
1247 A_WHILE = 1
1248 time.sleep(A_WHILE)
1249 wait_for_mon -= A_WHILE
1250 else:
1251 raise Exception('timed out waiting for mon to be updated with '
1252 'osd.{osd}: {got} < {need}'.
1253 format(osd=osd, got=got, need=need))
1254
1255 def flush_all_pg_stats(self):
1256 self.flush_pg_stats(range(len(self.get_osd_dump())))
1257
1258 def do_rados(self, remote, cmd, check_status=True):
1259 """
1260 Execute a remote rados command.
1261 """
1262 testdir = teuthology.get_testdir(self.ctx)
1263 pre = [
1264 'adjust-ulimits',
1265 'ceph-coverage',
1266 '{tdir}/archive/coverage'.format(tdir=testdir),
1267 'rados',
1268 '--cluster',
1269 self.cluster,
1270 ]
1271 pre.extend(cmd)
1272 proc = remote.run(
1273 args=pre,
1274 wait=True,
1275 check_status=check_status
1276 )
1277 return proc
1278
1279 def rados_write_objects(self, pool, num_objects, size,
1280 timelimit, threads, cleanup=False):
1281 """
1282 Write rados objects
1283 Threads not used yet.
1284 """
1285 args = [
1286 '-p', pool,
1287 '--num-objects', num_objects,
1288 '-b', size,
1289 'bench', timelimit,
1290 'write'
1291 ]
1292 if not cleanup:
1293 args.append('--no-cleanup')
1294 return self.do_rados(self.controller, map(str, args))
1295
1296 def do_put(self, pool, obj, fname, namespace=None):
1297 """
1298 Implement rados put operation
1299 """
1300 args = ['-p', pool]
1301 if namespace is not None:
1302 args += ['-N', namespace]
1303 args += [
1304 'put',
1305 obj,
1306 fname
1307 ]
1308 return self.do_rados(
1309 self.controller,
1310 args,
1311 check_status=False
1312 ).exitstatus
1313
1314 def do_get(self, pool, obj, fname='/dev/null', namespace=None):
1315 """
1316 Implement rados get operation
1317 """
1318 args = ['-p', pool]
1319 if namespace is not None:
1320 args += ['-N', namespace]
1321 args += [
1322 'get',
1323 obj,
1324 fname
1325 ]
1326 return self.do_rados(
1327 self.controller,
1328 args,
1329 check_status=False
1330 ).exitstatus
1331
1332 def do_rm(self, pool, obj, namespace=None):
1333 """
1334 Implement rados rm operation
1335 """
1336 args = ['-p', pool]
1337 if namespace is not None:
1338 args += ['-N', namespace]
1339 args += [
1340 'rm',
1341 obj
1342 ]
1343 return self.do_rados(
1344 self.controller,
1345 args,
1346 check_status=False
1347 ).exitstatus
1348
1349 def osd_admin_socket(self, osd_id, command, check_status=True, timeout=0, stdout=None):
1350 if stdout is None:
1351 stdout = StringIO()
1352 return self.admin_socket('osd', osd_id, command, check_status, timeout, stdout)
1353
1354 def find_remote(self, service_type, service_id):
1355 """
1356 Get the Remote for the host where a particular service runs.
1357
1358 :param service_type: 'mds', 'osd', 'client'
1359 :param service_id: The second part of a role, e.g. '0' for
1360 the role 'client.0'
1361 :return: a Remote instance for the host where the
1362 requested role is placed
1363 """
1364 return get_remote(self.ctx, self.cluster,
1365 service_type, service_id)
1366
1367 def admin_socket(self, service_type, service_id,
1368 command, check_status=True, timeout=0, stdout=None):
1369 """
1370 Remotely start up ceph specifying the admin socket
1371 :param command: a list of words to use as the command
1372 to the admin socket
1373 """
1374 if stdout is None:
1375 stdout = StringIO()
1376 testdir = teuthology.get_testdir(self.ctx)
1377 remote = self.find_remote(service_type, service_id)
1378 args = [
1379 'sudo',
1380 'adjust-ulimits',
1381 'ceph-coverage',
1382 '{tdir}/archive/coverage'.format(tdir=testdir),
1383 'timeout',
1384 str(timeout),
1385 'ceph',
1386 '--cluster',
1387 self.cluster,
1388 '--admin-daemon',
1389 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1390 cluster=self.cluster,
1391 type=service_type,
1392 id=service_id),
1393 ]
1394 args.extend(command)
1395 return remote.run(
1396 args=args,
1397 stdout=stdout,
1398 wait=True,
1399 check_status=check_status
1400 )
1401
1402 def objectstore_tool(self, pool, options, args, **kwargs):
1403 return ObjectStoreTool(self, pool, **kwargs).run(options, args)
1404
1405 def get_pgid(self, pool, pgnum):
1406 """
1407 :param pool: pool name
1408 :param pgnum: pg number
1409 :returns: a string representing this pg.
1410 """
1411 poolnum = self.get_pool_num(pool)
1412 pg_str = "{poolnum}.{pgnum}".format(
1413 poolnum=poolnum,
1414 pgnum=pgnum)
1415 return pg_str
1416
1417 def get_pg_replica(self, pool, pgnum):
1418 """
1419 get replica for pool, pgnum (e.g. (data, 0)->0
1420 """
1421 pg_str = self.get_pgid(pool, pgnum)
1422 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1423 j = json.loads('\n'.join(output.split('\n')[1:]))
1424 return int(j['acting'][-1])
1425 assert False
1426
1427 def wait_for_pg_stats(func):
1428 # both osd_mon_report_interval_min and mgr_stats_period are 5 seconds
1429 # by default, and take the faulty injection in ms into consideration,
1430 # 12 seconds are more than enough
1431 delays = [1, 1, 2, 3, 5, 8, 13]
1432 @wraps(func)
1433 def wrapper(self, *args, **kwargs):
1434 exc = None
1435 for delay in delays:
1436 try:
1437 return func(self, *args, **kwargs)
1438 except AssertionError as e:
1439 time.sleep(delay)
1440 exc = e
1441 raise exc
1442 return wrapper
1443
1444 def get_pg_primary(self, pool, pgnum):
1445 """
1446 get primary for pool, pgnum (e.g. (data, 0)->0
1447 """
1448 pg_str = self.get_pgid(pool, pgnum)
1449 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1450 j = json.loads('\n'.join(output.split('\n')[1:]))
1451 return int(j['acting'][0])
1452 assert False
1453
1454 def get_pool_num(self, pool):
1455 """
1456 get number for pool (e.g., data -> 2)
1457 """
1458 return int(self.get_pool_dump(pool)['pool'])
1459
1460 def list_pools(self):
1461 """
1462 list all pool names
1463 """
1464 osd_dump = self.get_osd_dump_json()
1465 self.log(osd_dump['pools'])
1466 return [str(i['pool_name']) for i in osd_dump['pools']]
1467
1468 def clear_pools(self):
1469 """
1470 remove all pools
1471 """
1472 [self.remove_pool(i) for i in self.list_pools()]
1473
1474 def kick_recovery_wq(self, osdnum):
1475 """
1476 Run kick_recovery_wq on cluster.
1477 """
1478 return self.raw_cluster_cmd(
1479 'tell', "osd.%d" % (int(osdnum),),
1480 'debug',
1481 'kick_recovery_wq',
1482 '0')
1483
1484 def wait_run_admin_socket(self, service_type,
1485 service_id, args=['version'], timeout=75, stdout=None):
1486 """
1487 If osd_admin_socket call suceeds, return. Otherwise wait
1488 five seconds and try again.
1489 """
1490 if stdout is None:
1491 stdout = StringIO()
1492 tries = 0
1493 while True:
1494 proc = self.admin_socket(service_type, service_id,
1495 args, check_status=False, stdout=stdout)
1496 if proc.exitstatus is 0:
1497 return proc
1498 else:
1499 tries += 1
1500 if (tries * 5) > timeout:
1501 raise Exception('timed out waiting for admin_socket '
1502 'to appear after {type}.{id} restart'.
1503 format(type=service_type,
1504 id=service_id))
1505 self.log("waiting on admin_socket for {type}-{id}, "
1506 "{command}".format(type=service_type,
1507 id=service_id,
1508 command=args))
1509 time.sleep(5)
1510
1511 def get_pool_dump(self, pool):
1512 """
1513 get the osd dump part of a pool
1514 """
1515 osd_dump = self.get_osd_dump_json()
1516 for i in osd_dump['pools']:
1517 if i['pool_name'] == pool:
1518 return i
1519 assert False
1520
1521 def get_config(self, service_type, service_id, name):
1522 """
1523 :param node: like 'mon.a'
1524 :param name: the option name
1525 """
1526 proc = self.wait_run_admin_socket(service_type, service_id,
1527 ['config', 'show'])
1528 j = json.loads(proc.stdout.getvalue())
1529 return j[name]
1530
1531 def set_config(self, osdnum, **argdict):
1532 """
1533 :param osdnum: osd number
1534 :param argdict: dictionary containing values to set.
1535 """
1536 for k, v in argdict.iteritems():
1537 self.wait_run_admin_socket(
1538 'osd', osdnum,
1539 ['config', 'set', str(k), str(v)])
1540
1541 def raw_cluster_status(self):
1542 """
1543 Get status from cluster
1544 """
1545 status = self.raw_cluster_cmd('status', '--format=json-pretty')
1546 return json.loads(status)
1547
1548 def raw_osd_status(self):
1549 """
1550 Get osd status from cluster
1551 """
1552 return self.raw_cluster_cmd('osd', 'dump')
1553
1554 def get_osd_status(self):
1555 """
1556 Get osd statuses sorted by states that the osds are in.
1557 """
1558 osd_lines = filter(
1559 lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
1560 self.raw_osd_status().split('\n'))
1561 self.log(osd_lines)
1562 in_osds = [int(i[4:].split()[0])
1563 for i in filter(lambda x: " in " in x, osd_lines)]
1564 out_osds = [int(i[4:].split()[0])
1565 for i in filter(lambda x: " out " in x, osd_lines)]
1566 up_osds = [int(i[4:].split()[0])
1567 for i in filter(lambda x: " up " in x, osd_lines)]
1568 down_osds = [int(i[4:].split()[0])
1569 for i in filter(lambda x: " down " in x, osd_lines)]
1570 dead_osds = [int(x.id_)
1571 for x in filter(lambda x:
1572 not x.running(),
1573 self.ctx.daemons.
1574 iter_daemons_of_role('osd', self.cluster))]
1575 live_osds = [int(x.id_) for x in
1576 filter(lambda x:
1577 x.running(),
1578 self.ctx.daemons.iter_daemons_of_role('osd',
1579 self.cluster))]
1580 return {'in': in_osds, 'out': out_osds, 'up': up_osds,
1581 'down': down_osds, 'dead': dead_osds, 'live': live_osds,
1582 'raw': osd_lines}
1583
1584 def get_num_pgs(self):
1585 """
1586 Check cluster status for the number of pgs
1587 """
1588 status = self.raw_cluster_status()
1589 self.log(status)
1590 return status['pgmap']['num_pgs']
1591
1592 def create_erasure_code_profile(self, profile_name, profile):
1593 """
1594 Create an erasure code profile name that can be used as a parameter
1595 when creating an erasure coded pool.
1596 """
1597 with self.lock:
1598 args = cmd_erasure_code_profile(profile_name, profile)
1599 self.raw_cluster_cmd(*args)
1600
1601 def create_pool_with_unique_name(self, pg_num=16,
1602 erasure_code_profile_name=None,
1603 min_size=None,
1604 erasure_code_use_overwrites=False):
1605 """
1606 Create a pool named unique_pool_X where X is unique.
1607 """
1608 name = ""
1609 with self.lock:
1610 name = "unique_pool_%s" % (str(self.next_pool_id),)
1611 self.next_pool_id += 1
1612 self.create_pool(
1613 name,
1614 pg_num,
1615 erasure_code_profile_name=erasure_code_profile_name,
1616 min_size=min_size,
1617 erasure_code_use_overwrites=erasure_code_use_overwrites)
1618 return name
1619
1620 @contextlib.contextmanager
1621 def pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
1622 self.create_pool(pool_name, pg_num, erasure_code_profile_name)
1623 yield
1624 self.remove_pool(pool_name)
1625
1626 def create_pool(self, pool_name, pg_num=16,
1627 erasure_code_profile_name=None,
1628 min_size=None,
1629 erasure_code_use_overwrites=False):
1630 """
1631 Create a pool named from the pool_name parameter.
1632 :param pool_name: name of the pool being created.
1633 :param pg_num: initial number of pgs.
1634 :param erasure_code_profile_name: if set and !None create an
1635 erasure coded pool using the profile
1636 :param erasure_code_use_overwrites: if true, allow overwrites
1637 """
1638 with self.lock:
1639 assert isinstance(pool_name, basestring)
1640 assert isinstance(pg_num, int)
1641 assert pool_name not in self.pools
1642 self.log("creating pool_name %s" % (pool_name,))
1643 if erasure_code_profile_name:
1644 self.raw_cluster_cmd('osd', 'pool', 'create',
1645 pool_name, str(pg_num), str(pg_num),
1646 'erasure', erasure_code_profile_name)
1647 else:
1648 self.raw_cluster_cmd('osd', 'pool', 'create',
1649 pool_name, str(pg_num))
1650 if min_size is not None:
1651 self.raw_cluster_cmd(
1652 'osd', 'pool', 'set', pool_name,
1653 'min_size',
1654 str(min_size))
1655 if erasure_code_use_overwrites:
1656 self.raw_cluster_cmd(
1657 'osd', 'pool', 'set', pool_name,
1658 'allow_ec_overwrites',
1659 'true')
1660 self.raw_cluster_cmd(
1661 'osd', 'pool', 'application', 'enable',
1662 pool_name, 'rados', '--yes-i-really-mean-it',
1663 run.Raw('||'), 'true')
1664 self.pools[pool_name] = pg_num
1665 time.sleep(1)
1666
1667 def add_pool_snap(self, pool_name, snap_name):
1668 """
1669 Add pool snapshot
1670 :param pool_name: name of pool to snapshot
1671 :param snap_name: name of snapshot to take
1672 """
1673 self.raw_cluster_cmd('osd', 'pool', 'mksnap',
1674 str(pool_name), str(snap_name))
1675
1676 def remove_pool_snap(self, pool_name, snap_name):
1677 """
1678 Remove pool snapshot
1679 :param pool_name: name of pool to snapshot
1680 :param snap_name: name of snapshot to remove
1681 """
1682 self.raw_cluster_cmd('osd', 'pool', 'rmsnap',
1683 str(pool_name), str(snap_name))
1684
1685 def remove_pool(self, pool_name):
1686 """
1687 Remove the indicated pool
1688 :param pool_name: Pool to be removed
1689 """
1690 with self.lock:
1691 assert isinstance(pool_name, basestring)
1692 assert pool_name in self.pools
1693 self.log("removing pool_name %s" % (pool_name,))
1694 del self.pools[pool_name]
1695 self.do_rados(self.controller,
1696 ['rmpool', pool_name, pool_name,
1697 "--yes-i-really-really-mean-it"])
1698
1699 def get_pool(self):
1700 """
1701 Pick a random pool
1702 """
1703 with self.lock:
1704 return random.choice(self.pools.keys())
1705
1706 def get_pool_pg_num(self, pool_name):
1707 """
1708 Return the number of pgs in the pool specified.
1709 """
1710 with self.lock:
1711 assert isinstance(pool_name, basestring)
1712 if pool_name in self.pools:
1713 return self.pools[pool_name]
1714 return 0
1715
1716 def get_pool_property(self, pool_name, prop):
1717 """
1718 :param pool_name: pool
1719 :param prop: property to be checked.
1720 :returns: property as an int value.
1721 """
1722 with self.lock:
1723 assert isinstance(pool_name, basestring)
1724 assert isinstance(prop, basestring)
1725 output = self.raw_cluster_cmd(
1726 'osd',
1727 'pool',
1728 'get',
1729 pool_name,
1730 prop)
1731 return int(output.split()[1])
1732
1733 def set_pool_property(self, pool_name, prop, val):
1734 """
1735 :param pool_name: pool
1736 :param prop: property to be set.
1737 :param val: value to set.
1738
1739 This routine retries if set operation fails.
1740 """
1741 with self.lock:
1742 assert isinstance(pool_name, basestring)
1743 assert isinstance(prop, basestring)
1744 assert isinstance(val, int)
1745 tries = 0
1746 while True:
1747 r = self.raw_cluster_cmd_result(
1748 'osd',
1749 'pool',
1750 'set',
1751 pool_name,
1752 prop,
1753 str(val))
1754 if r != 11: # EAGAIN
1755 break
1756 tries += 1
1757 if tries > 50:
1758 raise Exception('timed out getting EAGAIN '
1759 'when setting pool property %s %s = %s' %
1760 (pool_name, prop, val))
1761 self.log('got EAGAIN setting pool property, '
1762 'waiting a few seconds...')
1763 time.sleep(2)
1764
1765 def expand_pool(self, pool_name, by, max_pgs):
1766 """
1767 Increase the number of pgs in a pool
1768 """
1769 with self.lock:
1770 assert isinstance(pool_name, basestring)
1771 assert isinstance(by, int)
1772 assert pool_name in self.pools
1773 if self.get_num_creating() > 0:
1774 return False
1775 if (self.pools[pool_name] + by) > max_pgs:
1776 return False
1777 self.log("increase pool size by %d" % (by,))
1778 new_pg_num = self.pools[pool_name] + by
1779 self.set_pool_property(pool_name, "pg_num", new_pg_num)
1780 self.pools[pool_name] = new_pg_num
1781 return True
1782
1783 def set_pool_pgpnum(self, pool_name, force):
1784 """
1785 Set pgpnum property of pool_name pool.
1786 """
1787 with self.lock:
1788 assert isinstance(pool_name, basestring)
1789 assert pool_name in self.pools
1790 if not force and self.get_num_creating() > 0:
1791 return False
1792 self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
1793 return True
1794
1795 def list_pg_missing(self, pgid):
1796 """
1797 return list of missing pgs with the id specified
1798 """
1799 r = None
1800 offset = {}
1801 while True:
1802 out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_missing',
1803 json.dumps(offset))
1804 j = json.loads(out)
1805 if r is None:
1806 r = j
1807 else:
1808 r['objects'].extend(j['objects'])
1809 if not 'more' in j:
1810 break
1811 if j['more'] == 0:
1812 break
1813 offset = j['objects'][-1]['oid']
1814 if 'more' in r:
1815 del r['more']
1816 return r
1817
1818 def get_pg_stats(self):
1819 """
1820 Dump the cluster and get pg stats
1821 """
1822 out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
1823 j = json.loads('\n'.join(out.split('\n')[1:]))
1824 return j['pg_stats']
1825
1826 def get_pgids_to_force(self, backfill):
1827 """
1828 Return the randomized list of PGs that can have their recovery/backfill forced
1829 """
1830 j = self.get_pg_stats();
1831 pgids = []
1832 if backfill:
1833 wanted = ['degraded', 'backfilling', 'backfill_wait']
1834 else:
1835 wanted = ['recovering', 'degraded', 'recovery_wait']
1836 for pg in j:
1837 status = pg['state'].split('+')
1838 for t in wanted:
1839 if random.random() > 0.5 and not ('forced_backfill' in status or 'forced_recovery' in status) and t in status:
1840 pgids.append(pg['pgid'])
1841 break
1842 return pgids
1843
1844 def get_pgids_to_cancel_force(self, backfill):
1845 """
1846 Return the randomized list of PGs whose recovery/backfill priority is forced
1847 """
1848 j = self.get_pg_stats();
1849 pgids = []
1850 if backfill:
1851 wanted = 'forced_backfill'
1852 else:
1853 wanted = 'forced_recovery'
1854 for pg in j:
1855 status = pg['state'].split('+')
1856 if wanted in status and random.random() > 0.5:
1857 pgids.append(pg['pgid'])
1858 return pgids
1859
1860 def compile_pg_status(self):
1861 """
1862 Return a histogram of pg state values
1863 """
1864 ret = {}
1865 j = self.get_pg_stats()
1866 for pg in j:
1867 for status in pg['state'].split('+'):
1868 if status not in ret:
1869 ret[status] = 0
1870 ret[status] += 1
1871 return ret
1872
1873 @wait_for_pg_stats
1874 def with_pg_state(self, pool, pgnum, check):
1875 pgstr = self.get_pgid(pool, pgnum)
1876 stats = self.get_single_pg_stats(pgstr)
1877 assert(check(stats['state']))
1878
1879 @wait_for_pg_stats
1880 def with_pg(self, pool, pgnum, check):
1881 pgstr = self.get_pgid(pool, pgnum)
1882 stats = self.get_single_pg_stats(pgstr)
1883 return check(stats)
1884
1885 def get_last_scrub_stamp(self, pool, pgnum):
1886 """
1887 Get the timestamp of the last scrub.
1888 """
1889 stats = self.get_single_pg_stats(self.get_pgid(pool, pgnum))
1890 return stats["last_scrub_stamp"]
1891
1892 def do_pg_scrub(self, pool, pgnum, stype):
1893 """
1894 Scrub pg and wait for scrubbing to finish
1895 """
1896 init = self.get_last_scrub_stamp(pool, pgnum)
1897 RESEND_TIMEOUT = 120 # Must be a multiple of SLEEP_TIME
1898 FATAL_TIMEOUT = RESEND_TIMEOUT * 3
1899 SLEEP_TIME = 10
1900 timer = 0
1901 while init == self.get_last_scrub_stamp(pool, pgnum):
1902 assert timer < FATAL_TIMEOUT, "fatal timeout trying to " + stype
1903 self.log("waiting for scrub type %s" % (stype,))
1904 if (timer % RESEND_TIMEOUT) == 0:
1905 self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
1906 # The first time in this loop is the actual request
1907 if timer != 0 and stype == "repair":
1908 self.log("WARNING: Resubmitted a non-idempotent repair")
1909 time.sleep(SLEEP_TIME)
1910 timer += SLEEP_TIME
1911
1912 def wait_snap_trimming_complete(self, pool):
1913 """
1914 Wait for snap trimming on pool to end
1915 """
1916 POLL_PERIOD = 10
1917 FATAL_TIMEOUT = 600
1918 start = time.time()
1919 poolnum = self.get_pool_num(pool)
1920 poolnumstr = "%s." % (poolnum,)
1921 while (True):
1922 now = time.time()
1923 if (now - start) > FATAL_TIMEOUT:
1924 assert (now - start) < FATAL_TIMEOUT, \
1925 'failed to complete snap trimming before timeout'
1926 all_stats = self.get_pg_stats()
1927 trimming = False
1928 for pg in all_stats:
1929 if (poolnumstr in pg['pgid']) and ('snaptrim' in pg['state']):
1930 self.log("pg {pg} in trimming, state: {state}".format(
1931 pg=pg['pgid'],
1932 state=pg['state']))
1933 trimming = True
1934 if not trimming:
1935 break
1936 self.log("{pool} still trimming, waiting".format(pool=pool))
1937 time.sleep(POLL_PERIOD)
1938
1939 def get_single_pg_stats(self, pgid):
1940 """
1941 Return pg for the pgid specified.
1942 """
1943 all_stats = self.get_pg_stats()
1944
1945 for pg in all_stats:
1946 if pg['pgid'] == pgid:
1947 return pg
1948
1949 return None
1950
1951 def get_object_pg_with_shard(self, pool, name, osdid):
1952 """
1953 """
1954 pool_dump = self.get_pool_dump(pool)
1955 object_map = self.get_object_map(pool, name)
1956 if pool_dump["type"] == CephManager.ERASURE_CODED_POOL:
1957 shard = object_map['acting'].index(osdid)
1958 return "{pgid}s{shard}".format(pgid=object_map['pgid'],
1959 shard=shard)
1960 else:
1961 return object_map['pgid']
1962
1963 def get_object_primary(self, pool, name):
1964 """
1965 """
1966 object_map = self.get_object_map(pool, name)
1967 return object_map['acting_primary']
1968
1969 def get_object_map(self, pool, name):
1970 """
1971 osd map --format=json converted to a python object
1972 :returns: the python object
1973 """
1974 out = self.raw_cluster_cmd('--format=json', 'osd', 'map', pool, name)
1975 return json.loads('\n'.join(out.split('\n')[1:]))
1976
1977 def get_osd_dump_json(self):
1978 """
1979 osd dump --format=json converted to a python object
1980 :returns: the python object
1981 """
1982 out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
1983 return json.loads('\n'.join(out.split('\n')[1:]))
1984
1985 def get_osd_dump(self):
1986 """
1987 Dump osds
1988 :returns: all osds
1989 """
1990 return self.get_osd_dump_json()['osds']
1991
1992 def get_mgr_dump(self):
1993 out = self.raw_cluster_cmd('mgr', 'dump', '--format=json')
1994 return json.loads(out)
1995
1996 def get_stuck_pgs(self, type_, threshold):
1997 """
1998 :returns: stuck pg information from the cluster
1999 """
2000 out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
2001 '--format=json')
2002 return json.loads(out)
2003
2004 def get_num_unfound_objects(self):
2005 """
2006 Check cluster status to get the number of unfound objects
2007 """
2008 status = self.raw_cluster_status()
2009 self.log(status)
2010 return status['pgmap'].get('unfound_objects', 0)
2011
2012 def get_num_creating(self):
2013 """
2014 Find the number of pgs in creating mode.
2015 """
2016 pgs = self.get_pg_stats()
2017 num = 0
2018 for pg in pgs:
2019 if 'creating' in pg['state']:
2020 num += 1
2021 return num
2022
2023 def get_num_active_clean(self):
2024 """
2025 Find the number of active and clean pgs.
2026 """
2027 pgs = self.get_pg_stats()
2028 num = 0
2029 for pg in pgs:
2030 if (pg['state'].count('active') and
2031 pg['state'].count('clean') and
2032 not pg['state'].count('stale')):
2033 num += 1
2034 return num
2035
2036 def get_num_active_recovered(self):
2037 """
2038 Find the number of active and recovered pgs.
2039 """
2040 pgs = self.get_pg_stats()
2041 num = 0
2042 for pg in pgs:
2043 if (pg['state'].count('active') and
2044 not pg['state'].count('recover') and
2045 not pg['state'].count('backfill') and
2046 not pg['state'].count('stale')):
2047 num += 1
2048 return num
2049
2050 def get_is_making_recovery_progress(self):
2051 """
2052 Return whether there is recovery progress discernable in the
2053 raw cluster status
2054 """
2055 status = self.raw_cluster_status()
2056 kps = status['pgmap'].get('recovering_keys_per_sec', 0)
2057 bps = status['pgmap'].get('recovering_bytes_per_sec', 0)
2058 ops = status['pgmap'].get('recovering_objects_per_sec', 0)
2059 return kps > 0 or bps > 0 or ops > 0
2060
2061 def get_num_active(self):
2062 """
2063 Find the number of active pgs.
2064 """
2065 pgs = self.get_pg_stats()
2066 num = 0
2067 for pg in pgs:
2068 if pg['state'].count('active') and not pg['state'].count('stale'):
2069 num += 1
2070 return num
2071
2072 def get_num_down(self):
2073 """
2074 Find the number of pgs that are down.
2075 """
2076 pgs = self.get_pg_stats()
2077 num = 0
2078 for pg in pgs:
2079 if ((pg['state'].count('down') and not
2080 pg['state'].count('stale')) or
2081 (pg['state'].count('incomplete') and not
2082 pg['state'].count('stale'))):
2083 num += 1
2084 return num
2085
2086 def get_num_active_down(self):
2087 """
2088 Find the number of pgs that are either active or down.
2089 """
2090 pgs = self.get_pg_stats()
2091 num = 0
2092 for pg in pgs:
2093 if ((pg['state'].count('active') and not
2094 pg['state'].count('stale')) or
2095 (pg['state'].count('down') and not
2096 pg['state'].count('stale')) or
2097 (pg['state'].count('incomplete') and not
2098 pg['state'].count('stale'))):
2099 num += 1
2100 return num
2101
2102 def is_clean(self):
2103 """
2104 True if all pgs are clean
2105 """
2106 return self.get_num_active_clean() == self.get_num_pgs()
2107
2108 def is_recovered(self):
2109 """
2110 True if all pgs have recovered
2111 """
2112 return self.get_num_active_recovered() == self.get_num_pgs()
2113
2114 def is_active_or_down(self):
2115 """
2116 True if all pgs are active or down
2117 """
2118 return self.get_num_active_down() == self.get_num_pgs()
2119
2120 def wait_for_clean(self, timeout=None):
2121 """
2122 Returns true when all pgs are clean.
2123 """
2124 self.log("waiting for clean")
2125 start = time.time()
2126 num_active_clean = self.get_num_active_clean()
2127 while not self.is_clean():
2128 if timeout is not None:
2129 if self.get_is_making_recovery_progress():
2130 self.log("making progress, resetting timeout")
2131 start = time.time()
2132 else:
2133 self.log("no progress seen, keeping timeout for now")
2134 if time.time() - start >= timeout:
2135 self.log('dumping pgs')
2136 out = self.raw_cluster_cmd('pg', 'dump')
2137 self.log(out)
2138 assert time.time() - start < timeout, \
2139 'failed to become clean before timeout expired'
2140 cur_active_clean = self.get_num_active_clean()
2141 if cur_active_clean != num_active_clean:
2142 start = time.time()
2143 num_active_clean = cur_active_clean
2144 time.sleep(3)
2145 self.log("clean!")
2146
2147 def are_all_osds_up(self):
2148 """
2149 Returns true if all osds are up.
2150 """
2151 x = self.get_osd_dump()
2152 return (len(x) == sum([(y['up'] > 0) for y in x]))
2153
2154 def wait_for_all_osds_up(self, timeout=None):
2155 """
2156 When this exits, either the timeout has expired, or all
2157 osds are up.
2158 """
2159 self.log("waiting for all up")
2160 start = time.time()
2161 while not self.are_all_osds_up():
2162 if timeout is not None:
2163 assert time.time() - start < timeout, \
2164 'timeout expired in wait_for_all_osds_up'
2165 time.sleep(3)
2166 self.log("all up!")
2167
2168 def pool_exists(self, pool):
2169 if pool in self.list_pools():
2170 return True
2171 return False
2172
2173 def wait_for_pool(self, pool, timeout=300):
2174 """
2175 Wait for a pool to exist
2176 """
2177 self.log('waiting for pool %s to exist' % pool)
2178 start = time.time()
2179 while not self.pool_exists(pool):
2180 if timeout is not None:
2181 assert time.time() - start < timeout, \
2182 'timeout expired in wait_for_pool'
2183 time.sleep(3)
2184
2185 def wait_for_pools(self, pools):
2186 for pool in pools:
2187 self.wait_for_pool(pool)
2188
2189 def is_mgr_available(self):
2190 x = self.get_mgr_dump()
2191 return x.get('available', False)
2192
2193 def wait_for_mgr_available(self, timeout=None):
2194 self.log("waiting for mgr available")
2195 start = time.time()
2196 while not self.is_mgr_available():
2197 if timeout is not None:
2198 assert time.time() - start < timeout, \
2199 'timeout expired in wait_for_mgr_available'
2200 time.sleep(3)
2201 self.log("mgr available!")
2202
2203 def wait_for_recovery(self, timeout=None):
2204 """
2205 Check peering. When this exists, we have recovered.
2206 """
2207 self.log("waiting for recovery to complete")
2208 start = time.time()
2209 num_active_recovered = self.get_num_active_recovered()
2210 while not self.is_recovered():
2211 now = time.time()
2212 if timeout is not None:
2213 if self.get_is_making_recovery_progress():
2214 self.log("making progress, resetting timeout")
2215 start = time.time()
2216 else:
2217 self.log("no progress seen, keeping timeout for now")
2218 if now - start >= timeout:
2219 self.log('dumping pgs')
2220 out = self.raw_cluster_cmd('pg', 'dump')
2221 self.log(out)
2222 assert now - start < timeout, \
2223 'failed to recover before timeout expired'
2224 cur_active_recovered = self.get_num_active_recovered()
2225 if cur_active_recovered != num_active_recovered:
2226 start = time.time()
2227 num_active_recovered = cur_active_recovered
2228 time.sleep(3)
2229 self.log("recovered!")
2230
2231 def wait_for_active(self, timeout=None):
2232 """
2233 Check peering. When this exists, we are definitely active
2234 """
2235 self.log("waiting for peering to complete")
2236 start = time.time()
2237 num_active = self.get_num_active()
2238 while not self.is_active():
2239 if timeout is not None:
2240 if time.time() - start >= timeout:
2241 self.log('dumping pgs')
2242 out = self.raw_cluster_cmd('pg', 'dump')
2243 self.log(out)
2244 assert time.time() - start < timeout, \
2245 'failed to recover before timeout expired'
2246 cur_active = self.get_num_active()
2247 if cur_active != num_active:
2248 start = time.time()
2249 num_active = cur_active
2250 time.sleep(3)
2251 self.log("active!")
2252
2253 def wait_for_active_or_down(self, timeout=None):
2254 """
2255 Check peering. When this exists, we are definitely either
2256 active or down
2257 """
2258 self.log("waiting for peering to complete or become blocked")
2259 start = time.time()
2260 num_active_down = self.get_num_active_down()
2261 while not self.is_active_or_down():
2262 if timeout is not None:
2263 if time.time() - start >= timeout:
2264 self.log('dumping pgs')
2265 out = self.raw_cluster_cmd('pg', 'dump')
2266 self.log(out)
2267 assert time.time() - start < timeout, \
2268 'failed to recover before timeout expired'
2269 cur_active_down = self.get_num_active_down()
2270 if cur_active_down != num_active_down:
2271 start = time.time()
2272 num_active_down = cur_active_down
2273 time.sleep(3)
2274 self.log("active or down!")
2275
2276 def osd_is_up(self, osd):
2277 """
2278 Wrapper for osd check
2279 """
2280 osds = self.get_osd_dump()
2281 return osds[osd]['up'] > 0
2282
2283 def wait_till_osd_is_up(self, osd, timeout=None):
2284 """
2285 Loop waiting for osd.
2286 """
2287 self.log('waiting for osd.%d to be up' % osd)
2288 start = time.time()
2289 while not self.osd_is_up(osd):
2290 if timeout is not None:
2291 assert time.time() - start < timeout, \
2292 'osd.%d failed to come up before timeout expired' % osd
2293 time.sleep(3)
2294 self.log('osd.%d is up' % osd)
2295
2296 def is_active(self):
2297 """
2298 Wrapper to check if all pgs are active
2299 """
2300 return self.get_num_active() == self.get_num_pgs()
2301
2302 def wait_till_active(self, timeout=None):
2303 """
2304 Wait until all pgs are active.
2305 """
2306 self.log("waiting till active")
2307 start = time.time()
2308 while not self.is_active():
2309 if timeout is not None:
2310 if time.time() - start >= timeout:
2311 self.log('dumping pgs')
2312 out = self.raw_cluster_cmd('pg', 'dump')
2313 self.log(out)
2314 assert time.time() - start < timeout, \
2315 'failed to become active before timeout expired'
2316 time.sleep(3)
2317 self.log("active!")
2318
2319 def mark_out_osd(self, osd):
2320 """
2321 Wrapper to mark osd out.
2322 """
2323 self.raw_cluster_cmd('osd', 'out', str(osd))
2324
2325 def kill_osd(self, osd):
2326 """
2327 Kill osds by either power cycling (if indicated by the config)
2328 or by stopping.
2329 """
2330 if self.config.get('powercycle'):
2331 remote = self.find_remote('osd', osd)
2332 self.log('kill_osd on osd.{o} '
2333 'doing powercycle of {s}'.format(o=osd, s=remote.name))
2334 self._assert_ipmi(remote)
2335 remote.console.power_off()
2336 elif self.config.get('bdev_inject_crash') and self.config.get('bdev_inject_crash_probability'):
2337 if random.uniform(0, 1) < self.config.get('bdev_inject_crash_probability', .5):
2338 self.raw_cluster_cmd(
2339 '--', 'tell', 'osd.%d' % osd,
2340 'injectargs',
2341 '--bdev-inject-crash %d' % self.config.get('bdev_inject_crash'),
2342 )
2343 try:
2344 self.ctx.daemons.get_daemon('osd', osd, self.cluster).wait()
2345 except:
2346 pass
2347 else:
2348 raise RuntimeError('osd.%s did not fail' % osd)
2349 else:
2350 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2351 else:
2352 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2353
2354 @staticmethod
2355 def _assert_ipmi(remote):
2356 assert remote.console.has_ipmi_credentials, (
2357 "powercycling requested but RemoteConsole is not "
2358 "initialized. Check ipmi config.")
2359
2360 def blackhole_kill_osd(self, osd):
2361 """
2362 Stop osd if nothing else works.
2363 """
2364 self.raw_cluster_cmd('--', 'tell', 'osd.%d' % osd,
2365 'injectargs',
2366 '--objectstore-blackhole')
2367 time.sleep(2)
2368 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2369
2370 def revive_osd(self, osd, timeout=150, skip_admin_check=False):
2371 """
2372 Revive osds by either power cycling (if indicated by the config)
2373 or by restarting.
2374 """
2375 if self.config.get('powercycle'):
2376 remote = self.find_remote('osd', osd)
2377 self.log('kill_osd on osd.{o} doing powercycle of {s}'.
2378 format(o=osd, s=remote.name))
2379 self._assert_ipmi(remote)
2380 remote.console.power_on()
2381 if not remote.console.check_status(300):
2382 raise Exception('Failed to revive osd.{o} via ipmi'.
2383 format(o=osd))
2384 teuthology.reconnect(self.ctx, 60, [remote])
2385 mount_osd_data(self.ctx, remote, self.cluster, str(osd))
2386 self.make_admin_daemon_dir(remote)
2387 self.ctx.daemons.get_daemon('osd', osd, self.cluster).reset()
2388 self.ctx.daemons.get_daemon('osd', osd, self.cluster).restart()
2389
2390 if not skip_admin_check:
2391 # wait for dump_ops_in_flight; this command doesn't appear
2392 # until after the signal handler is installed and it is safe
2393 # to stop the osd again without making valgrind leak checks
2394 # unhappy. see #5924.
2395 self.wait_run_admin_socket('osd', osd,
2396 args=['dump_ops_in_flight'],
2397 timeout=timeout, stdout=DEVNULL)
2398
2399 def mark_down_osd(self, osd):
2400 """
2401 Cluster command wrapper
2402 """
2403 self.raw_cluster_cmd('osd', 'down', str(osd))
2404
2405 def mark_in_osd(self, osd):
2406 """
2407 Cluster command wrapper
2408 """
2409 self.raw_cluster_cmd('osd', 'in', str(osd))
2410
2411 def signal_osd(self, osd, sig, silent=False):
2412 """
2413 Wrapper to local get_daemon call which sends the given
2414 signal to the given osd.
2415 """
2416 self.ctx.daemons.get_daemon('osd', osd,
2417 self.cluster).signal(sig, silent=silent)
2418
2419 ## monitors
2420 def signal_mon(self, mon, sig, silent=False):
2421 """
2422 Wrapper to local get_deamon call
2423 """
2424 self.ctx.daemons.get_daemon('mon', mon,
2425 self.cluster).signal(sig, silent=silent)
2426
2427 def kill_mon(self, mon):
2428 """
2429 Kill the monitor by either power cycling (if the config says so),
2430 or by doing a stop.
2431 """
2432 if self.config.get('powercycle'):
2433 remote = self.find_remote('mon', mon)
2434 self.log('kill_mon on mon.{m} doing powercycle of {s}'.
2435 format(m=mon, s=remote.name))
2436 self._assert_ipmi(remote)
2437 remote.console.power_off()
2438 else:
2439 self.ctx.daemons.get_daemon('mon', mon, self.cluster).stop()
2440
2441 def revive_mon(self, mon):
2442 """
2443 Restart by either power cycling (if the config says so),
2444 or by doing a normal restart.
2445 """
2446 if self.config.get('powercycle'):
2447 remote = self.find_remote('mon', mon)
2448 self.log('revive_mon on mon.{m} doing powercycle of {s}'.
2449 format(m=mon, s=remote.name))
2450 self._assert_ipmi(remote)
2451 remote.console.power_on()
2452 self.make_admin_daemon_dir(remote)
2453 self.ctx.daemons.get_daemon('mon', mon, self.cluster).restart()
2454
2455 def revive_mgr(self, mgr):
2456 """
2457 Restart by either power cycling (if the config says so),
2458 or by doing a normal restart.
2459 """
2460 if self.config.get('powercycle'):
2461 remote = self.find_remote('mgr', mgr)
2462 self.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
2463 format(m=mgr, s=remote.name))
2464 self._assert_ipmi(remote)
2465 remote.console.power_on()
2466 self.make_admin_daemon_dir(remote)
2467 self.ctx.daemons.get_daemon('mgr', mgr, self.cluster).restart()
2468
2469 def get_mon_status(self, mon):
2470 """
2471 Extract all the monitor status information from the cluster
2472 """
2473 addr = self.ctx.ceph[self.cluster].conf['mon.%s' % mon]['mon addr']
2474 out = self.raw_cluster_cmd('-m', addr, 'mon_status')
2475 return json.loads(out)
2476
2477 def get_mon_quorum(self):
2478 """
2479 Extract monitor quorum information from the cluster
2480 """
2481 out = self.raw_cluster_cmd('quorum_status')
2482 j = json.loads(out)
2483 self.log('quorum_status is %s' % out)
2484 return j['quorum']
2485
2486 def wait_for_mon_quorum_size(self, size, timeout=300):
2487 """
2488 Loop until quorum size is reached.
2489 """
2490 self.log('waiting for quorum size %d' % size)
2491 start = time.time()
2492 while not len(self.get_mon_quorum()) == size:
2493 if timeout is not None:
2494 assert time.time() - start < timeout, \
2495 ('failed to reach quorum size %d '
2496 'before timeout expired' % size)
2497 time.sleep(3)
2498 self.log("quorum is size %d" % size)
2499
2500 def get_mon_health(self, debug=False):
2501 """
2502 Extract all the monitor health information.
2503 """
2504 out = self.raw_cluster_cmd('health', '--format=json')
2505 if debug:
2506 self.log('health:\n{h}'.format(h=out))
2507 return json.loads(out)
2508
2509 def get_mds_status(self, mds):
2510 """
2511 Run cluster commands for the mds in order to get mds information
2512 """
2513 out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
2514 j = json.loads(' '.join(out.splitlines()[1:]))
2515 # collate; for dup ids, larger gid wins.
2516 for info in j['info'].itervalues():
2517 if info['name'] == mds:
2518 return info
2519 return None
2520
2521 def get_filepath(self):
2522 """
2523 Return path to osd data with {id} needing to be replaced
2524 """
2525 return '/var/lib/ceph/osd/' + self.cluster + '-{id}'
2526
2527 def make_admin_daemon_dir(self, remote):
2528 """
2529 Create /var/run/ceph directory on remote site.
2530
2531 :param ctx: Context
2532 :param remote: Remote site
2533 """
2534 remote.run(args=['sudo',
2535 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
2536
2537
2538 def utility_task(name):
2539 """
2540 Generate ceph_manager subtask corresponding to ceph_manager
2541 method name
2542 """
2543 def task(ctx, config):
2544 if config is None:
2545 config = {}
2546 args = config.get('args', [])
2547 kwargs = config.get('kwargs', {})
2548 cluster = config.get('cluster', 'ceph')
2549 fn = getattr(ctx.managers[cluster], name)
2550 fn(*args, **kwargs)
2551 return task
2552
2553 revive_osd = utility_task("revive_osd")
2554 revive_mon = utility_task("revive_mon")
2555 kill_osd = utility_task("kill_osd")
2556 kill_mon = utility_task("kill_mon")
2557 create_pool = utility_task("create_pool")
2558 remove_pool = utility_task("remove_pool")
2559 wait_for_clean = utility_task("wait_for_clean")
2560 flush_all_pg_stats = utility_task("flush_all_pg_stats")
2561 set_pool_property = utility_task("set_pool_property")
2562 do_pg_scrub = utility_task("do_pg_scrub")
2563 wait_for_pool = utility_task("wait_for_pool")
2564 wait_for_pools = utility_task("wait_for_pools")