]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/ceph_manager.py
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / qa / tasks / ceph_manager.py
1 """
2 ceph manager -- Thrasher and CephManager objects
3 """
4 from cStringIO import StringIO
5 from functools import wraps
6 import contextlib
7 import random
8 import signal
9 import time
10 import gevent
11 import base64
12 import json
13 import logging
14 import threading
15 import traceback
16 import os
17 from teuthology import misc as teuthology
18 from tasks.scrub import Scrubber
19 from util.rados import cmd_erasure_code_profile
20 from util import get_remote
21 from teuthology.contextutil import safe_while
22 from teuthology.orchestra.remote import Remote
23 from teuthology.orchestra import run
24 from teuthology.exceptions import CommandFailedError
25
26 try:
27 from subprocess import DEVNULL # py3k
28 except ImportError:
29 DEVNULL = open(os.devnull, 'r+')
30
31 DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
32
33 log = logging.getLogger(__name__)
34
35
36 def write_conf(ctx, conf_path=DEFAULT_CONF_PATH, cluster='ceph'):
37 conf_fp = StringIO()
38 ctx.ceph[cluster].conf.write(conf_fp)
39 conf_fp.seek(0)
40 writes = ctx.cluster.run(
41 args=[
42 'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
43 'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
44 'sudo', 'python',
45 '-c',
46 ('import shutil, sys; '
47 'shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))'),
48 conf_path,
49 run.Raw('&&'),
50 'sudo', 'chmod', '0644', conf_path,
51 ],
52 stdin=run.PIPE,
53 wait=False)
54 teuthology.feed_many_stdins_and_close(conf_fp, writes)
55 run.wait(writes)
56
57
58 def mount_osd_data(ctx, remote, cluster, osd):
59 """
60 Mount a remote OSD
61
62 :param ctx: Context
63 :param remote: Remote site
64 :param cluster: name of ceph cluster
65 :param osd: Osd name
66 """
67 log.debug('Mounting data for osd.{o} on {r}'.format(o=osd, r=remote))
68 role = "{0}.osd.{1}".format(cluster, osd)
69 alt_role = role if cluster != 'ceph' else "osd.{0}".format(osd)
70 if remote in ctx.disk_config.remote_to_roles_to_dev:
71 if alt_role in ctx.disk_config.remote_to_roles_to_dev[remote]:
72 role = alt_role
73 if role not in ctx.disk_config.remote_to_roles_to_dev[remote]:
74 return
75 dev = ctx.disk_config.remote_to_roles_to_dev[remote][role]
76 mount_options = ctx.disk_config.\
77 remote_to_roles_to_dev_mount_options[remote][role]
78 fstype = ctx.disk_config.remote_to_roles_to_dev_fstype[remote][role]
79 mnt = os.path.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster, osd))
80
81 log.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
82 'mountpoint: {p}, type: {t}, options: {v}'.format(
83 o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options,
84 c=cluster))
85
86 remote.run(
87 args=[
88 'sudo',
89 'mount',
90 '-t', fstype,
91 '-o', ','.join(mount_options),
92 dev,
93 mnt,
94 ]
95 )
96
97
98 class Thrasher:
99 """
100 Object used to thrash Ceph
101 """
102 def __init__(self, manager, config, logger=None):
103 self.ceph_manager = manager
104 self.cluster = manager.cluster
105 self.ceph_manager.wait_for_clean()
106 osd_status = self.ceph_manager.get_osd_status()
107 self.in_osds = osd_status['in']
108 self.live_osds = osd_status['live']
109 self.out_osds = osd_status['out']
110 self.dead_osds = osd_status['dead']
111 self.stopping = False
112 self.logger = logger
113 self.config = config
114 self.revive_timeout = self.config.get("revive_timeout", 360)
115 self.pools_to_fix_pgp_num = set()
116 if self.config.get('powercycle'):
117 self.revive_timeout += 120
118 self.clean_wait = self.config.get('clean_wait', 0)
119 self.minin = self.config.get("min_in", 4)
120 self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
121 self.sighup_delay = self.config.get('sighup_delay')
122 self.optrack_toggle_delay = self.config.get('optrack_toggle_delay')
123 self.dump_ops_enable = self.config.get('dump_ops_enable')
124 self.noscrub_toggle_delay = self.config.get('noscrub_toggle_delay')
125 self.chance_thrash_cluster_full = self.config.get('chance_thrash_cluster_full', .05)
126 self.chance_thrash_pg_upmap = self.config.get('chance_thrash_pg_upmap', 1.0)
127 self.chance_thrash_pg_upmap_items = self.config.get('chance_thrash_pg_upmap', 1.0)
128 self.random_eio = self.config.get('random_eio')
129 self.chance_force_recovery = self.config.get('chance_force_recovery', 0.3)
130
131 num_osds = self.in_osds + self.out_osds
132 self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * len(num_osds)
133 self.min_pgs = self.config.get("min_pgs_per_pool_osd", 1) * len(num_osds)
134 if self.logger is not None:
135 self.log = lambda x: self.logger.info(x)
136 else:
137 def tmp(x):
138 """
139 Implement log behavior
140 """
141 print x
142 self.log = tmp
143 if self.config is None:
144 self.config = dict()
145 # prevent monitor from auto-marking things out while thrasher runs
146 # try both old and new tell syntax, in case we are testing old code
147 self.saved_options = []
148 # assuming that the default settings do not vary from one daemon to
149 # another
150 first_mon = teuthology.get_first_mon(manager.ctx, self.config).split('.')
151 opts = [('mon', 'mon_osd_down_out_interval', 0)]
152 for service, opt, new_value in opts:
153 old_value = manager.get_config(first_mon[0],
154 first_mon[1],
155 opt)
156 self.saved_options.append((service, opt, old_value))
157 manager.inject_args(service, '*', opt, new_value)
158 # initialize ceph_objectstore_tool property - must be done before
159 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
160 if (self.config.get('powercycle') or
161 not self.cmd_exists_on_osds("ceph-objectstore-tool") or
162 self.config.get('disable_objectstore_tool_tests', False)):
163 self.ceph_objectstore_tool = False
164 if self.config.get('powercycle'):
165 self.log("Unable to test ceph-objectstore-tool, "
166 "powercycle testing")
167 else:
168 self.log("Unable to test ceph-objectstore-tool, "
169 "not available on all OSD nodes")
170 else:
171 self.ceph_objectstore_tool = \
172 self.config.get('ceph_objectstore_tool', True)
173 # spawn do_thrash
174 self.thread = gevent.spawn(self.do_thrash)
175 if self.sighup_delay:
176 self.sighup_thread = gevent.spawn(self.do_sighup)
177 if self.optrack_toggle_delay:
178 self.optrack_toggle_thread = gevent.spawn(self.do_optrack_toggle)
179 if self.dump_ops_enable == "true":
180 self.dump_ops_thread = gevent.spawn(self.do_dump_ops)
181 if self.noscrub_toggle_delay:
182 self.noscrub_toggle_thread = gevent.spawn(self.do_noscrub_toggle)
183
184 def cmd_exists_on_osds(self, cmd):
185 allremotes = self.ceph_manager.ctx.cluster.only(\
186 teuthology.is_type('osd', self.cluster)).remotes.keys()
187 allremotes = list(set(allremotes))
188 for remote in allremotes:
189 proc = remote.run(args=['type', cmd], wait=True,
190 check_status=False, stdout=StringIO(),
191 stderr=StringIO())
192 if proc.exitstatus != 0:
193 return False;
194 return True;
195
196 def kill_osd(self, osd=None, mark_down=False, mark_out=False):
197 """
198 :param osd: Osd to be killed.
199 :mark_down: Mark down if true.
200 :mark_out: Mark out if true.
201 """
202 if osd is None:
203 osd = random.choice(self.live_osds)
204 self.log("Killing osd %s, live_osds are %s" % (str(osd),
205 str(self.live_osds)))
206 self.live_osds.remove(osd)
207 self.dead_osds.append(osd)
208 self.ceph_manager.kill_osd(osd)
209 if mark_down:
210 self.ceph_manager.mark_down_osd(osd)
211 if mark_out and osd in self.in_osds:
212 self.out_osd(osd)
213 if self.ceph_objectstore_tool:
214 self.log("Testing ceph-objectstore-tool on down osd")
215 remote = self.ceph_manager.find_remote('osd', osd)
216 FSPATH = self.ceph_manager.get_filepath()
217 JPATH = os.path.join(FSPATH, "journal")
218 exp_osd = imp_osd = osd
219 exp_remote = imp_remote = remote
220 # If an older osd is available we'll move a pg from there
221 if (len(self.dead_osds) > 1 and
222 random.random() < self.chance_move_pg):
223 exp_osd = random.choice(self.dead_osds[:-1])
224 exp_remote = self.ceph_manager.find_remote('osd', exp_osd)
225 if ('keyvaluestore_backend' in
226 self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
227 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
228 "--data-path {fpath} --journal-path {jpath} "
229 "--type keyvaluestore "
230 "--log-file="
231 "/var/log/ceph/objectstore_tool.\\$pid.log ".
232 format(fpath=FSPATH, jpath=JPATH))
233 else:
234 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
235 "--data-path {fpath} --journal-path {jpath} "
236 "--log-file="
237 "/var/log/ceph/objectstore_tool.\\$pid.log ".
238 format(fpath=FSPATH, jpath=JPATH))
239 cmd = (prefix + "--op list-pgs").format(id=exp_osd)
240
241 # ceph-objectstore-tool might be temporarily absent during an
242 # upgrade - see http://tracker.ceph.com/issues/18014
243 with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
244 while proceed():
245 proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'],
246 wait=True, check_status=False, stdout=StringIO(),
247 stderr=StringIO())
248 if proc.exitstatus == 0:
249 break
250 log.debug("ceph-objectstore-tool binary not present, trying again")
251
252 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
253 # see http://tracker.ceph.com/issues/19556
254 with safe_while(sleep=15, tries=40, action="ceph-objectstore-tool --op list-pgs") as proceed:
255 while proceed():
256 proc = exp_remote.run(args=cmd, wait=True,
257 check_status=False,
258 stdout=StringIO(), stderr=StringIO())
259 if proc.exitstatus == 0:
260 break
261 elif proc.exitstatus == 1 and proc.stderr == "OSD has the store locked":
262 continue
263 else:
264 raise Exception("ceph-objectstore-tool: "
265 "exp list-pgs failure with status {ret}".
266 format(ret=proc.exitstatus))
267
268 pgs = proc.stdout.getvalue().split('\n')[:-1]
269 if len(pgs) == 0:
270 self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
271 return
272 pg = random.choice(pgs)
273 exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
274 exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
275 exp_path = os.path.join(exp_path,
276 "exp.{pg}.{id}".format(
277 pg=pg,
278 id=exp_osd))
279 # export
280 # Can't use new export-remove op since this is part of upgrade testing
281 cmd = prefix + "--op export --pgid {pg} --file {file}"
282 cmd = cmd.format(id=exp_osd, pg=pg, file=exp_path)
283 proc = exp_remote.run(args=cmd)
284 if proc.exitstatus:
285 raise Exception("ceph-objectstore-tool: "
286 "export failure with status {ret}".
287 format(ret=proc.exitstatus))
288 # remove
289 cmd = prefix + "--force --op remove --pgid {pg}"
290 cmd = cmd.format(id=exp_osd, pg=pg)
291 proc = exp_remote.run(args=cmd)
292 if proc.exitstatus:
293 raise Exception("ceph-objectstore-tool: "
294 "remove failure with status {ret}".
295 format(ret=proc.exitstatus))
296 # If there are at least 2 dead osds we might move the pg
297 if exp_osd != imp_osd:
298 # If pg isn't already on this osd, then we will move it there
299 cmd = (prefix + "--op list-pgs").format(id=imp_osd)
300 proc = imp_remote.run(args=cmd, wait=True,
301 check_status=False, stdout=StringIO())
302 if proc.exitstatus:
303 raise Exception("ceph-objectstore-tool: "
304 "imp list-pgs failure with status {ret}".
305 format(ret=proc.exitstatus))
306 pgs = proc.stdout.getvalue().split('\n')[:-1]
307 if pg not in pgs:
308 self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
309 format(pg=pg, fosd=exp_osd, tosd=imp_osd))
310 if imp_remote != exp_remote:
311 # Copy export file to the other machine
312 self.log("Transfer export file from {srem} to {trem}".
313 format(srem=exp_remote, trem=imp_remote))
314 tmpexport = Remote.get_file(exp_remote, exp_path)
315 Remote.put_file(imp_remote, tmpexport, exp_path)
316 os.remove(tmpexport)
317 else:
318 # Can't move the pg after all
319 imp_osd = exp_osd
320 imp_remote = exp_remote
321 # import
322 cmd = (prefix + "--op import --file {file}")
323 cmd = cmd.format(id=imp_osd, file=exp_path)
324 proc = imp_remote.run(args=cmd, wait=True, check_status=False,
325 stderr=StringIO())
326 if proc.exitstatus == 1:
327 bogosity = "The OSD you are using is older than the exported PG"
328 if bogosity in proc.stderr.getvalue():
329 self.log("OSD older than exported PG"
330 "...ignored")
331 elif proc.exitstatus == 10:
332 self.log("Pool went away before processing an import"
333 "...ignored")
334 elif proc.exitstatus == 11:
335 self.log("Attempt to import an incompatible export"
336 "...ignored")
337 elif proc.exitstatus == 12:
338 # this should be safe to ignore because we only ever move 1
339 # copy of the pg at a time, and merge is only initiated when
340 # all replicas are peered and happy. /me crosses fingers
341 self.log("PG merged on target"
342 "...ignored")
343 elif proc.exitstatus:
344 raise Exception("ceph-objectstore-tool: "
345 "import failure with status {ret}".
346 format(ret=proc.exitstatus))
347 cmd = "rm -f {file}".format(file=exp_path)
348 exp_remote.run(args=cmd)
349 if imp_remote != exp_remote:
350 imp_remote.run(args=cmd)
351
352 # apply low split settings to each pool
353 for pool in self.ceph_manager.list_pools():
354 no_sudo_prefix = prefix[5:]
355 cmd = ("CEPH_ARGS='--filestore-merge-threshold 1 "
356 "--filestore-split-multiple 1' sudo -E "
357 + no_sudo_prefix + "--op apply-layout-settings --pool " + pool).format(id=osd)
358 proc = remote.run(args=cmd, wait=True, check_status=False, stderr=StringIO())
359 output = proc.stderr.getvalue()
360 if 'Couldn\'t find pool' in output:
361 continue
362 if proc.exitstatus:
363 raise Exception("ceph-objectstore-tool apply-layout-settings"
364 " failed with {status}".format(status=proc.exitstatus))
365
366
367 def blackhole_kill_osd(self, osd=None):
368 """
369 If all else fails, kill the osd.
370 :param osd: Osd to be killed.
371 """
372 if osd is None:
373 osd = random.choice(self.live_osds)
374 self.log("Blackholing and then killing osd %s, live_osds are %s" %
375 (str(osd), str(self.live_osds)))
376 self.live_osds.remove(osd)
377 self.dead_osds.append(osd)
378 self.ceph_manager.blackhole_kill_osd(osd)
379
380 def revive_osd(self, osd=None, skip_admin_check=False):
381 """
382 Revive the osd.
383 :param osd: Osd to be revived.
384 """
385 if osd is None:
386 osd = random.choice(self.dead_osds)
387 self.log("Reviving osd %s" % (str(osd),))
388 self.ceph_manager.revive_osd(
389 osd,
390 self.revive_timeout,
391 skip_admin_check=skip_admin_check)
392 self.dead_osds.remove(osd)
393 self.live_osds.append(osd)
394 if self.random_eio > 0 and osd == self.rerrosd:
395 self.ceph_manager.set_config(self.rerrosd,
396 filestore_debug_random_read_err = self.random_eio)
397 self.ceph_manager.set_config(self.rerrosd,
398 bluestore_debug_random_read_err = self.random_eio)
399
400
401 def out_osd(self, osd=None):
402 """
403 Mark the osd out
404 :param osd: Osd to be marked.
405 """
406 if osd is None:
407 osd = random.choice(self.in_osds)
408 self.log("Removing osd %s, in_osds are: %s" %
409 (str(osd), str(self.in_osds)))
410 self.ceph_manager.mark_out_osd(osd)
411 self.in_osds.remove(osd)
412 self.out_osds.append(osd)
413
414 def in_osd(self, osd=None):
415 """
416 Mark the osd out
417 :param osd: Osd to be marked.
418 """
419 if osd is None:
420 osd = random.choice(self.out_osds)
421 if osd in self.dead_osds:
422 return self.revive_osd(osd)
423 self.log("Adding osd %s" % (str(osd),))
424 self.out_osds.remove(osd)
425 self.in_osds.append(osd)
426 self.ceph_manager.mark_in_osd(osd)
427 self.log("Added osd %s" % (str(osd),))
428
429 def reweight_osd_or_by_util(self, osd=None):
430 """
431 Reweight an osd that is in
432 :param osd: Osd to be marked.
433 """
434 if osd is not None or random.choice([True, False]):
435 if osd is None:
436 osd = random.choice(self.in_osds)
437 val = random.uniform(.1, 1.0)
438 self.log("Reweighting osd %s to %s" % (str(osd), str(val)))
439 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
440 str(osd), str(val))
441 else:
442 # do it several times, the option space is large
443 for i in range(5):
444 options = {
445 'max_change': random.choice(['0.05', '1.0', '3.0']),
446 'overage': random.choice(['110', '1000']),
447 'type': random.choice([
448 'reweight-by-utilization',
449 'test-reweight-by-utilization']),
450 }
451 self.log("Reweighting by: %s"%(str(options),))
452 self.ceph_manager.raw_cluster_cmd(
453 'osd',
454 options['type'],
455 options['overage'],
456 options['max_change'])
457
458 def primary_affinity(self, osd=None):
459 if osd is None:
460 osd = random.choice(self.in_osds)
461 if random.random() >= .5:
462 pa = random.random()
463 elif random.random() >= .5:
464 pa = 1
465 else:
466 pa = 0
467 self.log('Setting osd %s primary_affinity to %f' % (str(osd), pa))
468 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
469 str(osd), str(pa))
470
471 def thrash_cluster_full(self):
472 """
473 Set and unset cluster full condition
474 """
475 self.log('Setting full ratio to .001')
476 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
477 time.sleep(1)
478 self.log('Setting full ratio back to .95')
479 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
480
481 def thrash_pg_upmap(self):
482 """
483 Install or remove random pg_upmap entries in OSDMap
484 """
485 from random import shuffle
486 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
487 j = json.loads(out)
488 self.log('j is %s' % j)
489 try:
490 if random.random() >= .3:
491 pgs = self.ceph_manager.get_pg_stats()
492 pg = random.choice(pgs)
493 pgid = str(pg['pgid'])
494 poolid = int(pgid.split('.')[0])
495 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
496 if len(sizes) == 0:
497 return
498 n = sizes[0]
499 osds = self.in_osds + self.out_osds
500 shuffle(osds)
501 osds = osds[0:n]
502 self.log('Setting %s to %s' % (pgid, osds))
503 cmd = ['osd', 'pg-upmap', pgid] + [str(x) for x in osds]
504 self.log('cmd %s' % cmd)
505 self.ceph_manager.raw_cluster_cmd(*cmd)
506 else:
507 m = j['pg_upmap']
508 if len(m) > 0:
509 shuffle(m)
510 pg = m[0]['pgid']
511 self.log('Clearing pg_upmap on %s' % pg)
512 self.ceph_manager.raw_cluster_cmd(
513 'osd',
514 'rm-pg-upmap',
515 pg)
516 else:
517 self.log('No pg_upmap entries; doing nothing')
518 except CommandFailedError:
519 self.log('Failed to rm-pg-upmap, ignoring')
520
521 def thrash_pg_upmap_items(self):
522 """
523 Install or remove random pg_upmap_items entries in OSDMap
524 """
525 from random import shuffle
526 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
527 j = json.loads(out)
528 self.log('j is %s' % j)
529 try:
530 if random.random() >= .3:
531 pgs = self.ceph_manager.get_pg_stats()
532 pg = random.choice(pgs)
533 pgid = str(pg['pgid'])
534 poolid = int(pgid.split('.')[0])
535 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
536 if len(sizes) == 0:
537 return
538 n = sizes[0]
539 osds = self.in_osds + self.out_osds
540 shuffle(osds)
541 osds = osds[0:n*2]
542 self.log('Setting %s to %s' % (pgid, osds))
543 cmd = ['osd', 'pg-upmap-items', pgid] + [str(x) for x in osds]
544 self.log('cmd %s' % cmd)
545 self.ceph_manager.raw_cluster_cmd(*cmd)
546 else:
547 m = j['pg_upmap_items']
548 if len(m) > 0:
549 shuffle(m)
550 pg = m[0]['pgid']
551 self.log('Clearing pg_upmap on %s' % pg)
552 self.ceph_manager.raw_cluster_cmd(
553 'osd',
554 'rm-pg-upmap-items',
555 pg)
556 else:
557 self.log('No pg_upmap entries; doing nothing')
558 except CommandFailedError:
559 self.log('Failed to rm-pg-upmap-items, ignoring')
560
561 def force_recovery(self):
562 """
563 Force recovery on some of PGs
564 """
565 backfill = random.random() >= 0.5
566 j = self.ceph_manager.get_pgids_to_force(backfill)
567 if j:
568 try:
569 if backfill:
570 self.ceph_manager.raw_cluster_cmd('pg', 'force-backfill', *j)
571 else:
572 self.ceph_manager.raw_cluster_cmd('pg', 'force-recovery', *j)
573 except CommandFailedError:
574 self.log('Failed to force backfill|recovery, ignoring')
575
576
577 def cancel_force_recovery(self):
578 """
579 Force recovery on some of PGs
580 """
581 backfill = random.random() >= 0.5
582 j = self.ceph_manager.get_pgids_to_cancel_force(backfill)
583 if j:
584 try:
585 if backfill:
586 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-backfill', *j)
587 else:
588 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-recovery', *j)
589 except CommandFailedError:
590 self.log('Failed to force backfill|recovery, ignoring')
591
592 def force_cancel_recovery(self):
593 """
594 Force or cancel forcing recovery
595 """
596 if random.random() >= 0.4:
597 self.force_recovery()
598 else:
599 self.cancel_force_recovery()
600
601 def all_up(self):
602 """
603 Make sure all osds are up and not out.
604 """
605 while len(self.dead_osds) > 0:
606 self.log("reviving osd")
607 self.revive_osd()
608 while len(self.out_osds) > 0:
609 self.log("inning osd")
610 self.in_osd()
611
612 def all_up_in(self):
613 """
614 Make sure all osds are up and fully in.
615 """
616 self.all_up();
617 for osd in self.live_osds:
618 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
619 str(osd), str(1))
620 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
621 str(osd), str(1))
622
623 def do_join(self):
624 """
625 Break out of this Ceph loop
626 """
627 self.stopping = True
628 self.thread.get()
629 if self.sighup_delay:
630 self.log("joining the do_sighup greenlet")
631 self.sighup_thread.get()
632 if self.optrack_toggle_delay:
633 self.log("joining the do_optrack_toggle greenlet")
634 self.optrack_toggle_thread.join()
635 if self.dump_ops_enable == "true":
636 self.log("joining the do_dump_ops greenlet")
637 self.dump_ops_thread.join()
638 if self.noscrub_toggle_delay:
639 self.log("joining the do_noscrub_toggle greenlet")
640 self.noscrub_toggle_thread.join()
641
642 def grow_pool(self):
643 """
644 Increase the size of the pool
645 """
646 pool = self.ceph_manager.get_pool()
647 orig_pg_num = self.ceph_manager.get_pool_pg_num(pool)
648 self.log("Growing pool %s" % (pool,))
649 if self.ceph_manager.expand_pool(pool,
650 self.config.get('pool_grow_by', 10),
651 self.max_pgs):
652 self.pools_to_fix_pgp_num.add(pool)
653
654 def shrink_pool(self):
655 """
656 Decrease the size of the pool
657 """
658 pool = self.ceph_manager.get_pool()
659 orig_pg_num = self.ceph_manager.get_pool_pg_num(pool)
660 self.log("Shrinking pool %s" % (pool,))
661 if self.ceph_manager.contract_pool(
662 pool,
663 self.config.get('pool_shrink_by', 10),
664 self.min_pgs):
665 self.pools_to_fix_pgp_num.add(pool)
666
667 def fix_pgp_num(self, pool=None):
668 """
669 Fix number of pgs in pool.
670 """
671 if pool is None:
672 pool = self.ceph_manager.get_pool()
673 force = False
674 else:
675 force = True
676 self.log("fixing pg num pool %s" % (pool,))
677 if self.ceph_manager.set_pool_pgpnum(pool, force):
678 self.pools_to_fix_pgp_num.discard(pool)
679
680 def test_pool_min_size(self):
681 """
682 Kill and revive all osds except one.
683 """
684 self.log("test_pool_min_size")
685 self.all_up()
686 self.ceph_manager.wait_for_recovery(
687 timeout=self.config.get('timeout')
688 )
689 the_one = random.choice(self.in_osds)
690 self.log("Killing everyone but %s", the_one)
691 to_kill = filter(lambda x: x != the_one, self.in_osds)
692 [self.kill_osd(i) for i in to_kill]
693 [self.out_osd(i) for i in to_kill]
694 time.sleep(self.config.get("test_pool_min_size_time", 10))
695 self.log("Killing %s" % (the_one,))
696 self.kill_osd(the_one)
697 self.out_osd(the_one)
698 self.log("Reviving everyone but %s" % (the_one,))
699 [self.revive_osd(i) for i in to_kill]
700 [self.in_osd(i) for i in to_kill]
701 self.log("Revived everyone but %s" % (the_one,))
702 self.log("Waiting for clean")
703 self.ceph_manager.wait_for_recovery(
704 timeout=self.config.get('timeout')
705 )
706
707 def inject_pause(self, conf_key, duration, check_after, should_be_down):
708 """
709 Pause injection testing. Check for osd being down when finished.
710 """
711 the_one = random.choice(self.live_osds)
712 self.log("inject_pause on {osd}".format(osd=the_one))
713 self.log(
714 "Testing {key} pause injection for duration {duration}".format(
715 key=conf_key,
716 duration=duration
717 ))
718 self.log(
719 "Checking after {after}, should_be_down={shouldbedown}".format(
720 after=check_after,
721 shouldbedown=should_be_down
722 ))
723 self.ceph_manager.set_config(the_one, **{conf_key: duration})
724 if not should_be_down:
725 return
726 time.sleep(check_after)
727 status = self.ceph_manager.get_osd_status()
728 assert the_one in status['down']
729 time.sleep(duration - check_after + 20)
730 status = self.ceph_manager.get_osd_status()
731 assert not the_one in status['down']
732
733 def test_backfill_full(self):
734 """
735 Test backfills stopping when the replica fills up.
736
737 First, use injectfull admin command to simulate a now full
738 osd by setting it to 0 on all of the OSDs.
739
740 Second, on a random subset, set
741 osd_debug_skip_full_check_in_backfill_reservation to force
742 the more complicated check in do_scan to be exercised.
743
744 Then, verify that all backfillings stop.
745 """
746 self.log("injecting backfill full")
747 for i in self.live_osds:
748 self.ceph_manager.set_config(
749 i,
750 osd_debug_skip_full_check_in_backfill_reservation=
751 random.choice(['false', 'true']))
752 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'backfillfull'],
753 check_status=True, timeout=30, stdout=DEVNULL)
754 for i in range(30):
755 status = self.ceph_manager.compile_pg_status()
756 if 'backfilling' not in status.keys():
757 break
758 self.log(
759 "waiting for {still_going} backfillings".format(
760 still_going=status.get('backfilling')))
761 time.sleep(1)
762 assert('backfilling' not in self.ceph_manager.compile_pg_status().keys())
763 for i in self.live_osds:
764 self.ceph_manager.set_config(
765 i,
766 osd_debug_skip_full_check_in_backfill_reservation='false')
767 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'none'],
768 check_status=True, timeout=30, stdout=DEVNULL)
769
770 def test_map_discontinuity(self):
771 """
772 1) Allows the osds to recover
773 2) kills an osd
774 3) allows the remaining osds to recover
775 4) waits for some time
776 5) revives the osd
777 This sequence should cause the revived osd to have to handle
778 a map gap since the mons would have trimmed
779 """
780 while len(self.in_osds) < (self.minin + 1):
781 self.in_osd()
782 self.log("Waiting for recovery")
783 self.ceph_manager.wait_for_all_osds_up(
784 timeout=self.config.get('timeout')
785 )
786 # now we wait 20s for the pg status to change, if it takes longer,
787 # the test *should* fail!
788 time.sleep(20)
789 self.ceph_manager.wait_for_clean(
790 timeout=self.config.get('timeout')
791 )
792
793 # now we wait 20s for the backfill replicas to hear about the clean
794 time.sleep(20)
795 self.log("Recovered, killing an osd")
796 self.kill_osd(mark_down=True, mark_out=True)
797 self.log("Waiting for clean again")
798 self.ceph_manager.wait_for_clean(
799 timeout=self.config.get('timeout')
800 )
801 self.log("Waiting for trim")
802 time.sleep(int(self.config.get("map_discontinuity_sleep_time", 40)))
803 self.revive_osd()
804
805 def choose_action(self):
806 """
807 Random action selector.
808 """
809 chance_down = self.config.get('chance_down', 0.4)
810 chance_test_min_size = self.config.get('chance_test_min_size', 0)
811 chance_test_backfill_full = \
812 self.config.get('chance_test_backfill_full', 0)
813 if isinstance(chance_down, int):
814 chance_down = float(chance_down) / 100
815 minin = self.minin
816 minout = self.config.get("min_out", 0)
817 minlive = self.config.get("min_live", 2)
818 mindead = self.config.get("min_dead", 0)
819
820 self.log('choose_action: min_in %d min_out '
821 '%d min_live %d min_dead %d' %
822 (minin, minout, minlive, mindead))
823 actions = []
824 if len(self.in_osds) > minin:
825 actions.append((self.out_osd, 1.0,))
826 if len(self.live_osds) > minlive and chance_down > 0:
827 actions.append((self.kill_osd, chance_down,))
828 if len(self.out_osds) > minout:
829 actions.append((self.in_osd, 1.7,))
830 if len(self.dead_osds) > mindead:
831 actions.append((self.revive_osd, 1.0,))
832 if self.config.get('thrash_primary_affinity', True):
833 actions.append((self.primary_affinity, 1.0,))
834 actions.append((self.reweight_osd_or_by_util,
835 self.config.get('reweight_osd', .5),))
836 actions.append((self.grow_pool,
837 self.config.get('chance_pgnum_grow', 0),))
838 actions.append((self.shrink_pool,
839 self.config.get('chance_pgnum_shrink', 0),))
840 actions.append((self.fix_pgp_num,
841 self.config.get('chance_pgpnum_fix', 0),))
842 actions.append((self.test_pool_min_size,
843 chance_test_min_size,))
844 actions.append((self.test_backfill_full,
845 chance_test_backfill_full,))
846 if self.chance_thrash_cluster_full > 0:
847 actions.append((self.thrash_cluster_full, self.chance_thrash_cluster_full,))
848 if self.chance_thrash_pg_upmap > 0:
849 actions.append((self.thrash_pg_upmap, self.chance_thrash_pg_upmap,))
850 if self.chance_thrash_pg_upmap_items > 0:
851 actions.append((self.thrash_pg_upmap_items, self.chance_thrash_pg_upmap_items,))
852 if self.chance_force_recovery > 0:
853 actions.append((self.force_cancel_recovery, self.chance_force_recovery))
854
855 for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
856 for scenario in [
857 (lambda:
858 self.inject_pause(key,
859 self.config.get('pause_short', 3),
860 0,
861 False),
862 self.config.get('chance_inject_pause_short', 1),),
863 (lambda:
864 self.inject_pause(key,
865 self.config.get('pause_long', 80),
866 self.config.get('pause_check_after', 70),
867 True),
868 self.config.get('chance_inject_pause_long', 0),)]:
869 actions.append(scenario)
870
871 total = sum([y for (x, y) in actions])
872 val = random.uniform(0, total)
873 for (action, prob) in actions:
874 if val < prob:
875 return action
876 val -= prob
877 return None
878
879 def log_exc(func):
880 @wraps(func)
881 def wrapper(self):
882 try:
883 return func(self)
884 except:
885 self.log(traceback.format_exc())
886 raise
887 return wrapper
888
889 @log_exc
890 def do_sighup(self):
891 """
892 Loops and sends signal.SIGHUP to a random live osd.
893
894 Loop delay is controlled by the config value sighup_delay.
895 """
896 delay = float(self.sighup_delay)
897 self.log("starting do_sighup with a delay of {0}".format(delay))
898 while not self.stopping:
899 osd = random.choice(self.live_osds)
900 self.ceph_manager.signal_osd(osd, signal.SIGHUP, silent=True)
901 time.sleep(delay)
902
903 @log_exc
904 def do_optrack_toggle(self):
905 """
906 Loops and toggle op tracking to all osds.
907
908 Loop delay is controlled by the config value optrack_toggle_delay.
909 """
910 delay = float(self.optrack_toggle_delay)
911 osd_state = "true"
912 self.log("starting do_optrack_toggle with a delay of {0}".format(delay))
913 while not self.stopping:
914 if osd_state == "true":
915 osd_state = "false"
916 else:
917 osd_state = "true"
918 try:
919 self.ceph_manager.inject_args('osd', '*',
920 'osd_enable_op_tracker',
921 osd_state)
922 except CommandFailedError:
923 self.log('Failed to tell all osds, ignoring')
924 gevent.sleep(delay)
925
926 @log_exc
927 def do_dump_ops(self):
928 """
929 Loops and does op dumps on all osds
930 """
931 self.log("starting do_dump_ops")
932 while not self.stopping:
933 for osd in self.live_osds:
934 # Ignore errors because live_osds is in flux
935 self.ceph_manager.osd_admin_socket(osd, command=['dump_ops_in_flight'],
936 check_status=False, timeout=30, stdout=DEVNULL)
937 self.ceph_manager.osd_admin_socket(osd, command=['dump_blocked_ops'],
938 check_status=False, timeout=30, stdout=DEVNULL)
939 self.ceph_manager.osd_admin_socket(osd, command=['dump_historic_ops'],
940 check_status=False, timeout=30, stdout=DEVNULL)
941 gevent.sleep(0)
942
943 @log_exc
944 def do_noscrub_toggle(self):
945 """
946 Loops and toggle noscrub flags
947
948 Loop delay is controlled by the config value noscrub_toggle_delay.
949 """
950 delay = float(self.noscrub_toggle_delay)
951 scrub_state = "none"
952 self.log("starting do_noscrub_toggle with a delay of {0}".format(delay))
953 while not self.stopping:
954 if scrub_state == "none":
955 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'noscrub')
956 scrub_state = "noscrub"
957 elif scrub_state == "noscrub":
958 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
959 scrub_state = "both"
960 elif scrub_state == "both":
961 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
962 scrub_state = "nodeep-scrub"
963 else:
964 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
965 scrub_state = "none"
966 gevent.sleep(delay)
967 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
968 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
969
970 @log_exc
971 def do_thrash(self):
972 """
973 Loop to select random actions to thrash ceph manager with.
974 """
975 cleanint = self.config.get("clean_interval", 60)
976 scrubint = self.config.get("scrub_interval", -1)
977 maxdead = self.config.get("max_dead", 0)
978 delay = self.config.get("op_delay", 5)
979 self.rerrosd = self.live_osds[0]
980 if self.random_eio > 0:
981 self.ceph_manager.inject_args('osd', self.rerrosd,
982 'filestore_debug_random_read_err',
983 self.random_eio)
984 self.ceph_manager.inject_args('osd', self.rerrosd,
985 'bluestore_debug_random_read_err',
986 self.random_eio)
987 self.log("starting do_thrash")
988 while not self.stopping:
989 to_log = [str(x) for x in ["in_osds: ", self.in_osds,
990 "out_osds: ", self.out_osds,
991 "dead_osds: ", self.dead_osds,
992 "live_osds: ", self.live_osds]]
993 self.log(" ".join(to_log))
994 if random.uniform(0, 1) < (float(delay) / cleanint):
995 while len(self.dead_osds) > maxdead:
996 self.revive_osd()
997 for osd in self.in_osds:
998 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
999 str(osd), str(1))
1000 if random.uniform(0, 1) < float(
1001 self.config.get('chance_test_map_discontinuity', 0)) \
1002 and len(self.live_osds) > 5: # avoid m=2,k=2 stall, w/ some buffer for crush being picky
1003 self.test_map_discontinuity()
1004 else:
1005 self.ceph_manager.wait_for_recovery(
1006 timeout=self.config.get('timeout')
1007 )
1008 time.sleep(self.clean_wait)
1009 if scrubint > 0:
1010 if random.uniform(0, 1) < (float(delay) / scrubint):
1011 self.log('Scrubbing while thrashing being performed')
1012 Scrubber(self.ceph_manager, self.config)
1013 self.choose_action()()
1014 time.sleep(delay)
1015 self.all_up()
1016 if self.random_eio > 0:
1017 self.ceph_manager.inject_args('osd', self.rerrosd,
1018 'filestore_debug_random_read_err', '0.0')
1019 self.ceph_manager.inject_args('osd', self.rerrosd,
1020 'bluestore_debug_random_read_err', '0.0')
1021 for pool in list(self.pools_to_fix_pgp_num):
1022 if self.ceph_manager.get_pool_pg_num(pool) > 0:
1023 self.fix_pgp_num(pool)
1024 self.pools_to_fix_pgp_num.clear()
1025 for service, opt, saved_value in self.saved_options:
1026 self.ceph_manager.inject_args(service, '*', opt, saved_value)
1027 self.saved_options = []
1028 self.all_up_in()
1029
1030
1031 class ObjectStoreTool:
1032
1033 def __init__(self, manager, pool, **kwargs):
1034 self.manager = manager
1035 self.pool = pool
1036 self.osd = kwargs.get('osd', None)
1037 self.object_name = kwargs.get('object_name', None)
1038 self.do_revive = kwargs.get('do_revive', True)
1039 if self.osd and self.pool and self.object_name:
1040 if self.osd == "primary":
1041 self.osd = self.manager.get_object_primary(self.pool,
1042 self.object_name)
1043 assert self.osd
1044 if self.object_name:
1045 self.pgid = self.manager.get_object_pg_with_shard(self.pool,
1046 self.object_name,
1047 self.osd)
1048 self.remote = self.manager.ctx.\
1049 cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()[0]
1050 path = self.manager.get_filepath().format(id=self.osd)
1051 self.paths = ("--data-path {path} --journal-path {path}/journal".
1052 format(path=path))
1053
1054 def build_cmd(self, options, args, stdin):
1055 lines = []
1056 if self.object_name:
1057 lines.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1058 "{paths} --pgid {pgid} --op list |"
1059 "grep '\"oid\":\"{name}\"')".
1060 format(paths=self.paths,
1061 pgid=self.pgid,
1062 name=self.object_name))
1063 args = '"$object" ' + args
1064 options += " --pgid {pgid}".format(pgid=self.pgid)
1065 cmd = ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1066 format(paths=self.paths,
1067 args=args,
1068 options=options))
1069 if stdin:
1070 cmd = ("echo {payload} | base64 --decode | {cmd}".
1071 format(payload=base64.encode(stdin),
1072 cmd=cmd))
1073 lines.append(cmd)
1074 return "\n".join(lines)
1075
1076 def run(self, options, args, stdin=None, stdout=None):
1077 if stdout is None:
1078 stdout = StringIO()
1079 self.manager.kill_osd(self.osd)
1080 cmd = self.build_cmd(options, args, stdin)
1081 self.manager.log(cmd)
1082 try:
1083 proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
1084 check_status=False,
1085 stdout=stdout,
1086 stderr=StringIO())
1087 proc.wait()
1088 if proc.exitstatus != 0:
1089 self.manager.log("failed with " + str(proc.exitstatus))
1090 error = proc.stdout.getvalue() + " " + proc.stderr.getvalue()
1091 raise Exception(error)
1092 finally:
1093 if self.do_revive:
1094 self.manager.revive_osd(self.osd)
1095 self.manager.wait_till_osd_is_up(self.osd, 300)
1096
1097
1098 class CephManager:
1099 """
1100 Ceph manager object.
1101 Contains several local functions that form a bulk of this module.
1102
1103 Note: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1104 the same name.
1105 """
1106
1107 REPLICATED_POOL = 1
1108 ERASURE_CODED_POOL = 3
1109
1110 def __init__(self, controller, ctx=None, config=None, logger=None,
1111 cluster='ceph'):
1112 self.lock = threading.RLock()
1113 self.ctx = ctx
1114 self.config = config
1115 self.controller = controller
1116 self.next_pool_id = 0
1117 self.cluster = cluster
1118 if (logger):
1119 self.log = lambda x: logger.info(x)
1120 else:
1121 def tmp(x):
1122 """
1123 implement log behavior.
1124 """
1125 print x
1126 self.log = tmp
1127 if self.config is None:
1128 self.config = dict()
1129 pools = self.list_pools()
1130 self.pools = {}
1131 for pool in pools:
1132 # we may race with a pool deletion; ignore failures here
1133 try:
1134 self.pools[pool] = self.get_pool_property(pool, 'pg_num')
1135 except CommandFailedError:
1136 self.log('Failed to get pg_num from pool %s, ignoring' % pool)
1137
1138 def raw_cluster_cmd(self, *args):
1139 """
1140 Start ceph on a raw cluster. Return count
1141 """
1142 testdir = teuthology.get_testdir(self.ctx)
1143 ceph_args = [
1144 'sudo',
1145 'adjust-ulimits',
1146 'ceph-coverage',
1147 '{tdir}/archive/coverage'.format(tdir=testdir),
1148 'timeout',
1149 '120',
1150 'ceph',
1151 '--cluster',
1152 self.cluster,
1153 ]
1154 ceph_args.extend(args)
1155 proc = self.controller.run(
1156 args=ceph_args,
1157 stdout=StringIO(),
1158 )
1159 return proc.stdout.getvalue()
1160
1161 def raw_cluster_cmd_result(self, *args, **kwargs):
1162 """
1163 Start ceph on a cluster. Return success or failure information.
1164 """
1165 testdir = teuthology.get_testdir(self.ctx)
1166 ceph_args = [
1167 'sudo',
1168 'adjust-ulimits',
1169 'ceph-coverage',
1170 '{tdir}/archive/coverage'.format(tdir=testdir),
1171 'timeout',
1172 '120',
1173 'ceph',
1174 '--cluster',
1175 self.cluster,
1176 ]
1177 ceph_args.extend(args)
1178 kwargs['args'] = ceph_args
1179 kwargs['check_status'] = False
1180 proc = self.controller.run(**kwargs)
1181 return proc.exitstatus
1182
1183 def run_ceph_w(self, watch_channel=None):
1184 """
1185 Execute "ceph -w" in the background with stdout connected to a StringIO,
1186 and return the RemoteProcess.
1187
1188 :param watch_channel: Specifies the channel to be watched. This can be
1189 'cluster', 'audit', ...
1190 :type watch_channel: str
1191 """
1192 args = ["sudo",
1193 "daemon-helper",
1194 "kill",
1195 "ceph",
1196 '--cluster',
1197 self.cluster,
1198 "-w"]
1199 if watch_channel is not None:
1200 args.append("--watch-channel")
1201 args.append(watch_channel)
1202 return self.controller.run(args=args, wait=False, stdout=StringIO(), stdin=run.PIPE)
1203
1204 def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
1205 """
1206 Flush pg stats from a list of OSD ids, ensuring they are reflected
1207 all the way to the monitor. Luminous and later only.
1208
1209 :param osds: list of OSDs to flush
1210 :param no_wait: list of OSDs not to wait for seq id. by default, we
1211 wait for all specified osds, but some of them could be
1212 moved out of osdmap, so we cannot get their updated
1213 stat seq from monitor anymore. in that case, you need
1214 to pass a blacklist.
1215 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
1216 it. (5 min by default)
1217 """
1218 seq = {osd: int(self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats'))
1219 for osd in osds}
1220 if not wait_for_mon:
1221 return
1222 if no_wait is None:
1223 no_wait = []
1224 for osd, need in seq.iteritems():
1225 if osd in no_wait:
1226 continue
1227 got = 0
1228 while wait_for_mon > 0:
1229 got = int(self.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd))
1230 self.log('need seq {need} got {got} for osd.{osd}'.format(
1231 need=need, got=got, osd=osd))
1232 if got >= need:
1233 break
1234 A_WHILE = 1
1235 time.sleep(A_WHILE)
1236 wait_for_mon -= A_WHILE
1237 else:
1238 raise Exception('timed out waiting for mon to be updated with '
1239 'osd.{osd}: {got} < {need}'.
1240 format(osd=osd, got=got, need=need))
1241
1242 def flush_all_pg_stats(self):
1243 self.flush_pg_stats(range(len(self.get_osd_dump())))
1244
1245 def do_rados(self, remote, cmd, check_status=True):
1246 """
1247 Execute a remote rados command.
1248 """
1249 testdir = teuthology.get_testdir(self.ctx)
1250 pre = [
1251 'adjust-ulimits',
1252 'ceph-coverage',
1253 '{tdir}/archive/coverage'.format(tdir=testdir),
1254 'rados',
1255 '--cluster',
1256 self.cluster,
1257 ]
1258 pre.extend(cmd)
1259 proc = remote.run(
1260 args=pre,
1261 wait=True,
1262 check_status=check_status
1263 )
1264 return proc
1265
1266 def rados_write_objects(self, pool, num_objects, size,
1267 timelimit, threads, cleanup=False):
1268 """
1269 Write rados objects
1270 Threads not used yet.
1271 """
1272 args = [
1273 '-p', pool,
1274 '--num-objects', num_objects,
1275 '-b', size,
1276 'bench', timelimit,
1277 'write'
1278 ]
1279 if not cleanup:
1280 args.append('--no-cleanup')
1281 return self.do_rados(self.controller, map(str, args))
1282
1283 def do_put(self, pool, obj, fname, namespace=None):
1284 """
1285 Implement rados put operation
1286 """
1287 args = ['-p', pool]
1288 if namespace is not None:
1289 args += ['-N', namespace]
1290 args += [
1291 'put',
1292 obj,
1293 fname
1294 ]
1295 return self.do_rados(
1296 self.controller,
1297 args,
1298 check_status=False
1299 ).exitstatus
1300
1301 def do_get(self, pool, obj, fname='/dev/null', namespace=None):
1302 """
1303 Implement rados get operation
1304 """
1305 args = ['-p', pool]
1306 if namespace is not None:
1307 args += ['-N', namespace]
1308 args += [
1309 'get',
1310 obj,
1311 fname
1312 ]
1313 return self.do_rados(
1314 self.controller,
1315 args,
1316 check_status=False
1317 ).exitstatus
1318
1319 def do_rm(self, pool, obj, namespace=None):
1320 """
1321 Implement rados rm operation
1322 """
1323 args = ['-p', pool]
1324 if namespace is not None:
1325 args += ['-N', namespace]
1326 args += [
1327 'rm',
1328 obj
1329 ]
1330 return self.do_rados(
1331 self.controller,
1332 args,
1333 check_status=False
1334 ).exitstatus
1335
1336 def osd_admin_socket(self, osd_id, command, check_status=True, timeout=0, stdout=None):
1337 if stdout is None:
1338 stdout = StringIO()
1339 return self.admin_socket('osd', osd_id, command, check_status, timeout, stdout)
1340
1341 def find_remote(self, service_type, service_id):
1342 """
1343 Get the Remote for the host where a particular service runs.
1344
1345 :param service_type: 'mds', 'osd', 'client'
1346 :param service_id: The second part of a role, e.g. '0' for
1347 the role 'client.0'
1348 :return: a Remote instance for the host where the
1349 requested role is placed
1350 """
1351 return get_remote(self.ctx, self.cluster,
1352 service_type, service_id)
1353
1354 def admin_socket(self, service_type, service_id,
1355 command, check_status=True, timeout=0, stdout=None):
1356 """
1357 Remotely start up ceph specifying the admin socket
1358 :param command: a list of words to use as the command
1359 to the admin socket
1360 """
1361 if stdout is None:
1362 stdout = StringIO()
1363 testdir = teuthology.get_testdir(self.ctx)
1364 remote = self.find_remote(service_type, service_id)
1365 args = [
1366 'sudo',
1367 'adjust-ulimits',
1368 'ceph-coverage',
1369 '{tdir}/archive/coverage'.format(tdir=testdir),
1370 'timeout',
1371 str(timeout),
1372 'ceph',
1373 '--cluster',
1374 self.cluster,
1375 '--admin-daemon',
1376 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1377 cluster=self.cluster,
1378 type=service_type,
1379 id=service_id),
1380 ]
1381 args.extend(command)
1382 return remote.run(
1383 args=args,
1384 stdout=stdout,
1385 wait=True,
1386 check_status=check_status
1387 )
1388
1389 def objectstore_tool(self, pool, options, args, **kwargs):
1390 return ObjectStoreTool(self, pool, **kwargs).run(options, args)
1391
1392 def get_pgid(self, pool, pgnum):
1393 """
1394 :param pool: pool name
1395 :param pgnum: pg number
1396 :returns: a string representing this pg.
1397 """
1398 poolnum = self.get_pool_num(pool)
1399 pg_str = "{poolnum}.{pgnum}".format(
1400 poolnum=poolnum,
1401 pgnum=pgnum)
1402 return pg_str
1403
1404 def get_pg_replica(self, pool, pgnum):
1405 """
1406 get replica for pool, pgnum (e.g. (data, 0)->0
1407 """
1408 pg_str = self.get_pgid(pool, pgnum)
1409 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1410 j = json.loads('\n'.join(output.split('\n')[1:]))
1411 return int(j['acting'][-1])
1412 assert False
1413
1414 def wait_for_pg_stats(func):
1415 # both osd_mon_report_interval and mgr_stats_period are 5 seconds
1416 # by default, and take the faulty injection in ms into consideration,
1417 # 12 seconds are more than enough
1418 delays = [1, 1, 2, 3, 5, 8, 13, 0]
1419 @wraps(func)
1420 def wrapper(self, *args, **kwargs):
1421 exc = None
1422 for delay in delays:
1423 try:
1424 return func(self, *args, **kwargs)
1425 except AssertionError as e:
1426 time.sleep(delay)
1427 exc = e
1428 raise exc
1429 return wrapper
1430
1431 def get_pg_primary(self, pool, pgnum):
1432 """
1433 get primary for pool, pgnum (e.g. (data, 0)->0
1434 """
1435 pg_str = self.get_pgid(pool, pgnum)
1436 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1437 j = json.loads('\n'.join(output.split('\n')[1:]))
1438 return int(j['acting'][0])
1439 assert False
1440
1441 def get_pool_num(self, pool):
1442 """
1443 get number for pool (e.g., data -> 2)
1444 """
1445 return int(self.get_pool_dump(pool)['pool'])
1446
1447 def list_pools(self):
1448 """
1449 list all pool names
1450 """
1451 osd_dump = self.get_osd_dump_json()
1452 self.log(osd_dump['pools'])
1453 return [str(i['pool_name']) for i in osd_dump['pools']]
1454
1455 def clear_pools(self):
1456 """
1457 remove all pools
1458 """
1459 [self.remove_pool(i) for i in self.list_pools()]
1460
1461 def kick_recovery_wq(self, osdnum):
1462 """
1463 Run kick_recovery_wq on cluster.
1464 """
1465 return self.raw_cluster_cmd(
1466 'tell', "osd.%d" % (int(osdnum),),
1467 'debug',
1468 'kick_recovery_wq',
1469 '0')
1470
1471 def wait_run_admin_socket(self, service_type,
1472 service_id, args=['version'], timeout=75, stdout=None):
1473 """
1474 If osd_admin_socket call succeeds, return. Otherwise wait
1475 five seconds and try again.
1476 """
1477 if stdout is None:
1478 stdout = StringIO()
1479 tries = 0
1480 while True:
1481 proc = self.admin_socket(service_type, service_id,
1482 args, check_status=False, stdout=stdout)
1483 if proc.exitstatus is 0:
1484 return proc
1485 else:
1486 tries += 1
1487 if (tries * 5) > timeout:
1488 raise Exception('timed out waiting for admin_socket '
1489 'to appear after {type}.{id} restart'.
1490 format(type=service_type,
1491 id=service_id))
1492 self.log("waiting on admin_socket for {type}-{id}, "
1493 "{command}".format(type=service_type,
1494 id=service_id,
1495 command=args))
1496 time.sleep(5)
1497
1498 def get_pool_dump(self, pool):
1499 """
1500 get the osd dump part of a pool
1501 """
1502 osd_dump = self.get_osd_dump_json()
1503 for i in osd_dump['pools']:
1504 if i['pool_name'] == pool:
1505 return i
1506 assert False
1507
1508 def get_config(self, service_type, service_id, name):
1509 """
1510 :param node: like 'mon.a'
1511 :param name: the option name
1512 """
1513 proc = self.wait_run_admin_socket(service_type, service_id,
1514 ['config', 'show'])
1515 j = json.loads(proc.stdout.getvalue())
1516 return j[name]
1517
1518 def inject_args(self, service_type, service_id, name, value):
1519 whom = '{0}.{1}'.format(service_type, service_id)
1520 if isinstance(value, bool):
1521 value = 'true' if value else 'false'
1522 opt_arg = '--{name}={value}'.format(name=name, value=value)
1523 self.raw_cluster_cmd('--', 'tell', whom, 'injectargs', opt_arg)
1524
1525 def set_config(self, osdnum, **argdict):
1526 """
1527 :param osdnum: osd number
1528 :param argdict: dictionary containing values to set.
1529 """
1530 for k, v in argdict.iteritems():
1531 self.wait_run_admin_socket(
1532 'osd', osdnum,
1533 ['config', 'set', str(k), str(v)])
1534
1535 def raw_cluster_status(self):
1536 """
1537 Get status from cluster
1538 """
1539 status = self.raw_cluster_cmd('status', '--format=json-pretty')
1540 return json.loads(status)
1541
1542 def raw_osd_status(self):
1543 """
1544 Get osd status from cluster
1545 """
1546 return self.raw_cluster_cmd('osd', 'dump')
1547
1548 def get_osd_status(self):
1549 """
1550 Get osd statuses sorted by states that the osds are in.
1551 """
1552 osd_lines = filter(
1553 lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
1554 self.raw_osd_status().split('\n'))
1555 self.log(osd_lines)
1556 in_osds = [int(i[4:].split()[0])
1557 for i in filter(lambda x: " in " in x, osd_lines)]
1558 out_osds = [int(i[4:].split()[0])
1559 for i in filter(lambda x: " out " in x, osd_lines)]
1560 up_osds = [int(i[4:].split()[0])
1561 for i in filter(lambda x: " up " in x, osd_lines)]
1562 down_osds = [int(i[4:].split()[0])
1563 for i in filter(lambda x: " down " in x, osd_lines)]
1564 dead_osds = [int(x.id_)
1565 for x in filter(lambda x:
1566 not x.running(),
1567 self.ctx.daemons.
1568 iter_daemons_of_role('osd', self.cluster))]
1569 live_osds = [int(x.id_) for x in
1570 filter(lambda x:
1571 x.running(),
1572 self.ctx.daemons.iter_daemons_of_role('osd',
1573 self.cluster))]
1574 return {'in': in_osds, 'out': out_osds, 'up': up_osds,
1575 'down': down_osds, 'dead': dead_osds, 'live': live_osds,
1576 'raw': osd_lines}
1577
1578 def get_num_pgs(self):
1579 """
1580 Check cluster status for the number of pgs
1581 """
1582 status = self.raw_cluster_status()
1583 self.log(status)
1584 return status['pgmap']['num_pgs']
1585
1586 def create_erasure_code_profile(self, profile_name, profile):
1587 """
1588 Create an erasure code profile name that can be used as a parameter
1589 when creating an erasure coded pool.
1590 """
1591 with self.lock:
1592 args = cmd_erasure_code_profile(profile_name, profile)
1593 self.raw_cluster_cmd(*args)
1594
1595 def create_pool_with_unique_name(self, pg_num=16,
1596 erasure_code_profile_name=None,
1597 min_size=None,
1598 erasure_code_use_overwrites=False):
1599 """
1600 Create a pool named unique_pool_X where X is unique.
1601 """
1602 name = ""
1603 with self.lock:
1604 name = "unique_pool_%s" % (str(self.next_pool_id),)
1605 self.next_pool_id += 1
1606 self.create_pool(
1607 name,
1608 pg_num,
1609 erasure_code_profile_name=erasure_code_profile_name,
1610 min_size=min_size,
1611 erasure_code_use_overwrites=erasure_code_use_overwrites)
1612 return name
1613
1614 @contextlib.contextmanager
1615 def pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
1616 self.create_pool(pool_name, pg_num, erasure_code_profile_name)
1617 yield
1618 self.remove_pool(pool_name)
1619
1620 def create_pool(self, pool_name, pg_num=16,
1621 erasure_code_profile_name=None,
1622 min_size=None,
1623 erasure_code_use_overwrites=False):
1624 """
1625 Create a pool named from the pool_name parameter.
1626 :param pool_name: name of the pool being created.
1627 :param pg_num: initial number of pgs.
1628 :param erasure_code_profile_name: if set and !None create an
1629 erasure coded pool using the profile
1630 :param erasure_code_use_overwrites: if true, allow overwrites
1631 """
1632 with self.lock:
1633 assert isinstance(pool_name, basestring)
1634 assert isinstance(pg_num, int)
1635 assert pool_name not in self.pools
1636 self.log("creating pool_name %s" % (pool_name,))
1637 if erasure_code_profile_name:
1638 self.raw_cluster_cmd('osd', 'pool', 'create',
1639 pool_name, str(pg_num), str(pg_num),
1640 'erasure', erasure_code_profile_name)
1641 else:
1642 self.raw_cluster_cmd('osd', 'pool', 'create',
1643 pool_name, str(pg_num))
1644 if min_size is not None:
1645 self.raw_cluster_cmd(
1646 'osd', 'pool', 'set', pool_name,
1647 'min_size',
1648 str(min_size))
1649 if erasure_code_use_overwrites:
1650 self.raw_cluster_cmd(
1651 'osd', 'pool', 'set', pool_name,
1652 'allow_ec_overwrites',
1653 'true')
1654 self.raw_cluster_cmd(
1655 'osd', 'pool', 'application', 'enable',
1656 pool_name, 'rados', '--yes-i-really-mean-it',
1657 run.Raw('||'), 'true')
1658 self.pools[pool_name] = pg_num
1659 time.sleep(1)
1660
1661 def add_pool_snap(self, pool_name, snap_name):
1662 """
1663 Add pool snapshot
1664 :param pool_name: name of pool to snapshot
1665 :param snap_name: name of snapshot to take
1666 """
1667 self.raw_cluster_cmd('osd', 'pool', 'mksnap',
1668 str(pool_name), str(snap_name))
1669
1670 def remove_pool_snap(self, pool_name, snap_name):
1671 """
1672 Remove pool snapshot
1673 :param pool_name: name of pool to snapshot
1674 :param snap_name: name of snapshot to remove
1675 """
1676 self.raw_cluster_cmd('osd', 'pool', 'rmsnap',
1677 str(pool_name), str(snap_name))
1678
1679 def remove_pool(self, pool_name):
1680 """
1681 Remove the indicated pool
1682 :param pool_name: Pool to be removed
1683 """
1684 with self.lock:
1685 assert isinstance(pool_name, basestring)
1686 assert pool_name in self.pools
1687 self.log("removing pool_name %s" % (pool_name,))
1688 del self.pools[pool_name]
1689 self.raw_cluster_cmd('osd', 'pool', 'rm', pool_name, pool_name,
1690 "--yes-i-really-really-mean-it")
1691
1692 def get_pool(self):
1693 """
1694 Pick a random pool
1695 """
1696 with self.lock:
1697 return random.choice(self.pools.keys())
1698
1699 def get_pool_pg_num(self, pool_name):
1700 """
1701 Return the number of pgs in the pool specified.
1702 """
1703 with self.lock:
1704 assert isinstance(pool_name, basestring)
1705 if pool_name in self.pools:
1706 return self.pools[pool_name]
1707 return 0
1708
1709 def get_pool_property(self, pool_name, prop):
1710 """
1711 :param pool_name: pool
1712 :param prop: property to be checked.
1713 :returns: property as an int value.
1714 """
1715 with self.lock:
1716 assert isinstance(pool_name, basestring)
1717 assert isinstance(prop, basestring)
1718 output = self.raw_cluster_cmd(
1719 'osd',
1720 'pool',
1721 'get',
1722 pool_name,
1723 prop)
1724 return int(output.split()[1])
1725
1726 def set_pool_property(self, pool_name, prop, val):
1727 """
1728 :param pool_name: pool
1729 :param prop: property to be set.
1730 :param val: value to set.
1731
1732 This routine retries if set operation fails.
1733 """
1734 with self.lock:
1735 assert isinstance(pool_name, basestring)
1736 assert isinstance(prop, basestring)
1737 assert isinstance(val, int)
1738 tries = 0
1739 while True:
1740 r = self.raw_cluster_cmd_result(
1741 'osd',
1742 'pool',
1743 'set',
1744 pool_name,
1745 prop,
1746 str(val))
1747 if r != 11: # EAGAIN
1748 break
1749 tries += 1
1750 if tries > 50:
1751 raise Exception('timed out getting EAGAIN '
1752 'when setting pool property %s %s = %s' %
1753 (pool_name, prop, val))
1754 self.log('got EAGAIN setting pool property, '
1755 'waiting a few seconds...')
1756 time.sleep(2)
1757
1758 def expand_pool(self, pool_name, by, max_pgs):
1759 """
1760 Increase the number of pgs in a pool
1761 """
1762 with self.lock:
1763 assert isinstance(pool_name, basestring)
1764 assert isinstance(by, int)
1765 assert pool_name in self.pools
1766 if self.get_num_creating() > 0:
1767 return False
1768 if (self.pools[pool_name] + by) > max_pgs:
1769 return False
1770 self.log("increase pool size by %d" % (by,))
1771 new_pg_num = self.pools[pool_name] + by
1772 self.set_pool_property(pool_name, "pg_num", new_pg_num)
1773 self.pools[pool_name] = new_pg_num
1774 return True
1775
1776 def contract_pool(self, pool_name, by, min_pgs):
1777 """
1778 Decrease the number of pgs in a pool
1779 """
1780 with self.lock:
1781 self.log('contract_pool %s by %s min %s' % (
1782 pool_name, str(by), str(min_pgs)))
1783 assert isinstance(pool_name, basestring)
1784 assert isinstance(by, int)
1785 assert pool_name in self.pools
1786 if self.get_num_creating() > 0:
1787 self.log('too many creating')
1788 return False
1789 proj = self.pools[pool_name] - by
1790 if proj < min_pgs:
1791 self.log('would drop below min_pgs, proj %d, currently %d' % (proj,self.pools[pool_name],))
1792 return False
1793 self.log("decrease pool size by %d" % (by,))
1794 new_pg_num = self.pools[pool_name] - by
1795 self.set_pool_property(pool_name, "pg_num", new_pg_num)
1796 self.pools[pool_name] = new_pg_num
1797 return True
1798
1799 def stop_pg_num_changes(self):
1800 """
1801 Reset all pg_num_targets back to pg_num, canceling splits and merges
1802 """
1803 self.log('Canceling any pending splits or merges...')
1804 osd_dump = self.get_osd_dump_json()
1805 for pool in osd_dump['pools']:
1806 if pool['pg_num'] != pool['pg_num_target']:
1807 self.log('Setting pool %s (%d) pg_num %d -> %d' %
1808 (pool['pool_name'], pool['pool'],
1809 pool['pg_num_target'],
1810 pool['pg_num']))
1811 self.raw_cluster_cmd('osd', 'pool', 'set', pool['pool_name'],
1812 'pg_num', str(pool['pg_num']))
1813
1814 def set_pool_pgpnum(self, pool_name, force):
1815 """
1816 Set pgpnum property of pool_name pool.
1817 """
1818 with self.lock:
1819 assert isinstance(pool_name, basestring)
1820 assert pool_name in self.pools
1821 if not force and self.get_num_creating() > 0:
1822 return False
1823 self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
1824 return True
1825
1826 def list_pg_unfound(self, pgid):
1827 """
1828 return list of unfound pgs with the id specified
1829 """
1830 r = None
1831 offset = {}
1832 while True:
1833 out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_unfound',
1834 json.dumps(offset))
1835 j = json.loads(out)
1836 if r is None:
1837 r = j
1838 else:
1839 r['objects'].extend(j['objects'])
1840 if not 'more' in j:
1841 break
1842 if j['more'] == 0:
1843 break
1844 offset = j['objects'][-1]['oid']
1845 if 'more' in r:
1846 del r['more']
1847 return r
1848
1849 def get_pg_stats(self):
1850 """
1851 Dump the cluster and get pg stats
1852 """
1853 out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
1854 j = json.loads('\n'.join(out.split('\n')[1:]))
1855 try:
1856 return j['pg_map']['pg_stats']
1857 except KeyError:
1858 return j['pg_stats']
1859
1860 def get_pgids_to_force(self, backfill):
1861 """
1862 Return the randomized list of PGs that can have their recovery/backfill forced
1863 """
1864 j = self.get_pg_stats();
1865 pgids = []
1866 if backfill:
1867 wanted = ['degraded', 'backfilling', 'backfill_wait']
1868 else:
1869 wanted = ['recovering', 'degraded', 'recovery_wait']
1870 for pg in j:
1871 status = pg['state'].split('+')
1872 for t in wanted:
1873 if random.random() > 0.5 and not ('forced_backfill' in status or 'forced_recovery' in status) and t in status:
1874 pgids.append(pg['pgid'])
1875 break
1876 return pgids
1877
1878 def get_pgids_to_cancel_force(self, backfill):
1879 """
1880 Return the randomized list of PGs whose recovery/backfill priority is forced
1881 """
1882 j = self.get_pg_stats();
1883 pgids = []
1884 if backfill:
1885 wanted = 'forced_backfill'
1886 else:
1887 wanted = 'forced_recovery'
1888 for pg in j:
1889 status = pg['state'].split('+')
1890 if wanted in status and random.random() > 0.5:
1891 pgids.append(pg['pgid'])
1892 return pgids
1893
1894 def compile_pg_status(self):
1895 """
1896 Return a histogram of pg state values
1897 """
1898 ret = {}
1899 j = self.get_pg_stats()
1900 for pg in j:
1901 for status in pg['state'].split('+'):
1902 if status not in ret:
1903 ret[status] = 0
1904 ret[status] += 1
1905 return ret
1906
1907 @wait_for_pg_stats
1908 def with_pg_state(self, pool, pgnum, check):
1909 pgstr = self.get_pgid(pool, pgnum)
1910 stats = self.get_single_pg_stats(pgstr)
1911 assert(check(stats['state']))
1912
1913 @wait_for_pg_stats
1914 def with_pg(self, pool, pgnum, check):
1915 pgstr = self.get_pgid(pool, pgnum)
1916 stats = self.get_single_pg_stats(pgstr)
1917 return check(stats)
1918
1919 def get_last_scrub_stamp(self, pool, pgnum):
1920 """
1921 Get the timestamp of the last scrub.
1922 """
1923 stats = self.get_single_pg_stats(self.get_pgid(pool, pgnum))
1924 return stats["last_scrub_stamp"]
1925
1926 def do_pg_scrub(self, pool, pgnum, stype):
1927 """
1928 Scrub pg and wait for scrubbing to finish
1929 """
1930 init = self.get_last_scrub_stamp(pool, pgnum)
1931 RESEND_TIMEOUT = 120 # Must be a multiple of SLEEP_TIME
1932 FATAL_TIMEOUT = RESEND_TIMEOUT * 3
1933 SLEEP_TIME = 10
1934 timer = 0
1935 while init == self.get_last_scrub_stamp(pool, pgnum):
1936 assert timer < FATAL_TIMEOUT, "fatal timeout trying to " + stype
1937 self.log("waiting for scrub type %s" % (stype,))
1938 if (timer % RESEND_TIMEOUT) == 0:
1939 self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
1940 # The first time in this loop is the actual request
1941 if timer != 0 and stype == "repair":
1942 self.log("WARNING: Resubmitted a non-idempotent repair")
1943 time.sleep(SLEEP_TIME)
1944 timer += SLEEP_TIME
1945
1946 def wait_snap_trimming_complete(self, pool):
1947 """
1948 Wait for snap trimming on pool to end
1949 """
1950 POLL_PERIOD = 10
1951 FATAL_TIMEOUT = 600
1952 start = time.time()
1953 poolnum = self.get_pool_num(pool)
1954 poolnumstr = "%s." % (poolnum,)
1955 while (True):
1956 now = time.time()
1957 if (now - start) > FATAL_TIMEOUT:
1958 assert (now - start) < FATAL_TIMEOUT, \
1959 'failed to complete snap trimming before timeout'
1960 all_stats = self.get_pg_stats()
1961 trimming = False
1962 for pg in all_stats:
1963 if (poolnumstr in pg['pgid']) and ('snaptrim' in pg['state']):
1964 self.log("pg {pg} in trimming, state: {state}".format(
1965 pg=pg['pgid'],
1966 state=pg['state']))
1967 trimming = True
1968 if not trimming:
1969 break
1970 self.log("{pool} still trimming, waiting".format(pool=pool))
1971 time.sleep(POLL_PERIOD)
1972
1973 def get_single_pg_stats(self, pgid):
1974 """
1975 Return pg for the pgid specified.
1976 """
1977 all_stats = self.get_pg_stats()
1978
1979 for pg in all_stats:
1980 if pg['pgid'] == pgid:
1981 return pg
1982
1983 return None
1984
1985 def get_object_pg_with_shard(self, pool, name, osdid):
1986 """
1987 """
1988 pool_dump = self.get_pool_dump(pool)
1989 object_map = self.get_object_map(pool, name)
1990 if pool_dump["type"] == CephManager.ERASURE_CODED_POOL:
1991 shard = object_map['acting'].index(osdid)
1992 return "{pgid}s{shard}".format(pgid=object_map['pgid'],
1993 shard=shard)
1994 else:
1995 return object_map['pgid']
1996
1997 def get_object_primary(self, pool, name):
1998 """
1999 """
2000 object_map = self.get_object_map(pool, name)
2001 return object_map['acting_primary']
2002
2003 def get_object_map(self, pool, name):
2004 """
2005 osd map --format=json converted to a python object
2006 :returns: the python object
2007 """
2008 out = self.raw_cluster_cmd('--format=json', 'osd', 'map', pool, name)
2009 return json.loads('\n'.join(out.split('\n')[1:]))
2010
2011 def get_osd_dump_json(self):
2012 """
2013 osd dump --format=json converted to a python object
2014 :returns: the python object
2015 """
2016 out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
2017 return json.loads('\n'.join(out.split('\n')[1:]))
2018
2019 def get_osd_dump(self):
2020 """
2021 Dump osds
2022 :returns: all osds
2023 """
2024 return self.get_osd_dump_json()['osds']
2025
2026 def get_osd_metadata(self):
2027 """
2028 osd metadata --format=json converted to a python object
2029 :returns: the python object containing osd metadata information
2030 """
2031 out = self.raw_cluster_cmd('osd', 'metadata', '--format=json')
2032 return json.loads('\n'.join(out.split('\n')[1:]))
2033
2034 def get_mgr_dump(self):
2035 out = self.raw_cluster_cmd('mgr', 'dump', '--format=json')
2036 return json.loads(out)
2037
2038 def get_stuck_pgs(self, type_, threshold):
2039 """
2040 :returns: stuck pg information from the cluster
2041 """
2042 out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
2043 '--format=json')
2044 return json.loads(out).get('stuck_pg_stats',[])
2045
2046 def get_num_unfound_objects(self):
2047 """
2048 Check cluster status to get the number of unfound objects
2049 """
2050 status = self.raw_cluster_status()
2051 self.log(status)
2052 return status['pgmap'].get('unfound_objects', 0)
2053
2054 def get_num_creating(self):
2055 """
2056 Find the number of pgs in creating mode.
2057 """
2058 pgs = self.get_pg_stats()
2059 num = 0
2060 for pg in pgs:
2061 if 'creating' in pg['state']:
2062 num += 1
2063 return num
2064
2065 def get_num_active_clean(self):
2066 """
2067 Find the number of active and clean pgs.
2068 """
2069 pgs = self.get_pg_stats()
2070 num = 0
2071 for pg in pgs:
2072 if (pg['state'].count('active') and
2073 pg['state'].count('clean') and
2074 not pg['state'].count('stale')):
2075 num += 1
2076 return num
2077
2078 def get_num_active_recovered(self):
2079 """
2080 Find the number of active and recovered pgs.
2081 """
2082 pgs = self.get_pg_stats()
2083 num = 0
2084 for pg in pgs:
2085 if (pg['state'].count('active') and
2086 not pg['state'].count('recover') and
2087 not pg['state'].count('backfilling') and
2088 not pg['state'].count('stale')):
2089 num += 1
2090 return num
2091
2092 def get_is_making_recovery_progress(self):
2093 """
2094 Return whether there is recovery progress discernable in the
2095 raw cluster status
2096 """
2097 status = self.raw_cluster_status()
2098 kps = status['pgmap'].get('recovering_keys_per_sec', 0)
2099 bps = status['pgmap'].get('recovering_bytes_per_sec', 0)
2100 ops = status['pgmap'].get('recovering_objects_per_sec', 0)
2101 return kps > 0 or bps > 0 or ops > 0
2102
2103 def get_num_active(self):
2104 """
2105 Find the number of active pgs.
2106 """
2107 pgs = self.get_pg_stats()
2108 num = 0
2109 for pg in pgs:
2110 if pg['state'].count('active') and not pg['state'].count('stale'):
2111 num += 1
2112 return num
2113
2114 def get_num_down(self):
2115 """
2116 Find the number of pgs that are down.
2117 """
2118 pgs = self.get_pg_stats()
2119 num = 0
2120 for pg in pgs:
2121 if ((pg['state'].count('down') and not
2122 pg['state'].count('stale')) or
2123 (pg['state'].count('incomplete') and not
2124 pg['state'].count('stale'))):
2125 num += 1
2126 return num
2127
2128 def get_num_active_down(self):
2129 """
2130 Find the number of pgs that are either active or down.
2131 """
2132 pgs = self.get_pg_stats()
2133 num = 0
2134 for pg in pgs:
2135 if ((pg['state'].count('active') and not
2136 pg['state'].count('stale')) or
2137 (pg['state'].count('down') and not
2138 pg['state'].count('stale')) or
2139 (pg['state'].count('incomplete') and not
2140 pg['state'].count('stale'))):
2141 num += 1
2142 return num
2143
2144 def is_clean(self):
2145 """
2146 True if all pgs are clean
2147 """
2148 return self.get_num_active_clean() == self.get_num_pgs()
2149
2150 def is_recovered(self):
2151 """
2152 True if all pgs have recovered
2153 """
2154 return self.get_num_active_recovered() == self.get_num_pgs()
2155
2156 def is_active_or_down(self):
2157 """
2158 True if all pgs are active or down
2159 """
2160 return self.get_num_active_down() == self.get_num_pgs()
2161
2162 def wait_for_clean(self, timeout=1200):
2163 """
2164 Returns true when all pgs are clean.
2165 """
2166 self.log("waiting for clean")
2167 start = time.time()
2168 num_active_clean = self.get_num_active_clean()
2169 while not self.is_clean():
2170 if timeout is not None:
2171 if self.get_is_making_recovery_progress():
2172 self.log("making progress, resetting timeout")
2173 start = time.time()
2174 else:
2175 self.log("no progress seen, keeping timeout for now")
2176 if time.time() - start >= timeout:
2177 self.log('dumping pgs')
2178 out = self.raw_cluster_cmd('pg', 'dump')
2179 self.log(out)
2180 assert time.time() - start < timeout, \
2181 'failed to become clean before timeout expired'
2182 cur_active_clean = self.get_num_active_clean()
2183 if cur_active_clean != num_active_clean:
2184 start = time.time()
2185 num_active_clean = cur_active_clean
2186 time.sleep(3)
2187 self.log("clean!")
2188
2189 def are_all_osds_up(self):
2190 """
2191 Returns true if all osds are up.
2192 """
2193 x = self.get_osd_dump()
2194 return (len(x) == sum([(y['up'] > 0) for y in x]))
2195
2196 def wait_for_all_osds_up(self, timeout=None):
2197 """
2198 When this exits, either the timeout has expired, or all
2199 osds are up.
2200 """
2201 self.log("waiting for all up")
2202 start = time.time()
2203 while not self.are_all_osds_up():
2204 if timeout is not None:
2205 assert time.time() - start < timeout, \
2206 'timeout expired in wait_for_all_osds_up'
2207 time.sleep(3)
2208 self.log("all up!")
2209
2210 def pool_exists(self, pool):
2211 if pool in self.list_pools():
2212 return True
2213 return False
2214
2215 def wait_for_pool(self, pool, timeout=300):
2216 """
2217 Wait for a pool to exist
2218 """
2219 self.log('waiting for pool %s to exist' % pool)
2220 start = time.time()
2221 while not self.pool_exists(pool):
2222 if timeout is not None:
2223 assert time.time() - start < timeout, \
2224 'timeout expired in wait_for_pool'
2225 time.sleep(3)
2226
2227 def wait_for_pools(self, pools):
2228 for pool in pools:
2229 self.wait_for_pool(pool)
2230
2231 def is_mgr_available(self):
2232 x = self.get_mgr_dump()
2233 return x.get('available', False)
2234
2235 def wait_for_mgr_available(self, timeout=None):
2236 self.log("waiting for mgr available")
2237 start = time.time()
2238 while not self.is_mgr_available():
2239 if timeout is not None:
2240 assert time.time() - start < timeout, \
2241 'timeout expired in wait_for_mgr_available'
2242 time.sleep(3)
2243 self.log("mgr available!")
2244
2245 def wait_for_recovery(self, timeout=None):
2246 """
2247 Check peering. When this exists, we have recovered.
2248 """
2249 self.log("waiting for recovery to complete")
2250 start = time.time()
2251 num_active_recovered = self.get_num_active_recovered()
2252 while not self.is_recovered():
2253 now = time.time()
2254 if timeout is not None:
2255 if self.get_is_making_recovery_progress():
2256 self.log("making progress, resetting timeout")
2257 start = time.time()
2258 else:
2259 self.log("no progress seen, keeping timeout for now")
2260 if now - start >= timeout:
2261 if self.is_recovered():
2262 break
2263 self.log('dumping pgs')
2264 out = self.raw_cluster_cmd('pg', 'dump')
2265 self.log(out)
2266 assert now - start < timeout, \
2267 'failed to recover before timeout expired'
2268 cur_active_recovered = self.get_num_active_recovered()
2269 if cur_active_recovered != num_active_recovered:
2270 start = time.time()
2271 num_active_recovered = cur_active_recovered
2272 time.sleep(3)
2273 self.log("recovered!")
2274
2275 def wait_for_active(self, timeout=None):
2276 """
2277 Check peering. When this exists, we are definitely active
2278 """
2279 self.log("waiting for peering to complete")
2280 start = time.time()
2281 num_active = self.get_num_active()
2282 while not self.is_active():
2283 if timeout is not None:
2284 if time.time() - start >= timeout:
2285 self.log('dumping pgs')
2286 out = self.raw_cluster_cmd('pg', 'dump')
2287 self.log(out)
2288 assert time.time() - start < timeout, \
2289 'failed to recover before timeout expired'
2290 cur_active = self.get_num_active()
2291 if cur_active != num_active:
2292 start = time.time()
2293 num_active = cur_active
2294 time.sleep(3)
2295 self.log("active!")
2296
2297 def wait_for_active_or_down(self, timeout=None):
2298 """
2299 Check peering. When this exists, we are definitely either
2300 active or down
2301 """
2302 self.log("waiting for peering to complete or become blocked")
2303 start = time.time()
2304 num_active_down = self.get_num_active_down()
2305 while not self.is_active_or_down():
2306 if timeout is not None:
2307 if time.time() - start >= timeout:
2308 self.log('dumping pgs')
2309 out = self.raw_cluster_cmd('pg', 'dump')
2310 self.log(out)
2311 assert time.time() - start < timeout, \
2312 'failed to recover before timeout expired'
2313 cur_active_down = self.get_num_active_down()
2314 if cur_active_down != num_active_down:
2315 start = time.time()
2316 num_active_down = cur_active_down
2317 time.sleep(3)
2318 self.log("active or down!")
2319
2320 def osd_is_up(self, osd):
2321 """
2322 Wrapper for osd check
2323 """
2324 osds = self.get_osd_dump()
2325 return osds[osd]['up'] > 0
2326
2327 def wait_till_osd_is_up(self, osd, timeout=None):
2328 """
2329 Loop waiting for osd.
2330 """
2331 self.log('waiting for osd.%d to be up' % osd)
2332 start = time.time()
2333 while not self.osd_is_up(osd):
2334 if timeout is not None:
2335 assert time.time() - start < timeout, \
2336 'osd.%d failed to come up before timeout expired' % osd
2337 time.sleep(3)
2338 self.log('osd.%d is up' % osd)
2339
2340 def is_active(self):
2341 """
2342 Wrapper to check if all pgs are active
2343 """
2344 return self.get_num_active() == self.get_num_pgs()
2345
2346 def wait_till_active(self, timeout=None):
2347 """
2348 Wait until all pgs are active.
2349 """
2350 self.log("waiting till active")
2351 start = time.time()
2352 while not self.is_active():
2353 if timeout is not None:
2354 if time.time() - start >= timeout:
2355 self.log('dumping pgs')
2356 out = self.raw_cluster_cmd('pg', 'dump')
2357 self.log(out)
2358 assert time.time() - start < timeout, \
2359 'failed to become active before timeout expired'
2360 time.sleep(3)
2361 self.log("active!")
2362
2363 def wait_till_pg_convergence(self, timeout=None):
2364 start = time.time()
2365 old_stats = None
2366 active_osds = [osd['osd'] for osd in self.get_osd_dump()
2367 if osd['in'] and osd['up']]
2368 while True:
2369 # strictly speaking, no need to wait for mon. but due to the
2370 # "ms inject socket failures" setting, the osdmap could be delayed,
2371 # so mgr is likely to ignore the pg-stat messages with pgs serving
2372 # newly created pools which is not yet known by mgr. so, to make sure
2373 # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
2374 # necessary.
2375 self.flush_pg_stats(active_osds)
2376 new_stats = dict((stat['pgid'], stat['state'])
2377 for stat in self.get_pg_stats())
2378 if old_stats == new_stats:
2379 return old_stats
2380 if timeout is not None:
2381 assert time.time() - start < timeout, \
2382 'failed to reach convergence before %d secs' % timeout
2383 old_stats = new_stats
2384 # longer than mgr_stats_period
2385 time.sleep(5 + 1)
2386
2387 def mark_out_osd(self, osd):
2388 """
2389 Wrapper to mark osd out.
2390 """
2391 self.raw_cluster_cmd('osd', 'out', str(osd))
2392
2393 def kill_osd(self, osd):
2394 """
2395 Kill osds by either power cycling (if indicated by the config)
2396 or by stopping.
2397 """
2398 if self.config.get('powercycle'):
2399 remote = self.find_remote('osd', osd)
2400 self.log('kill_osd on osd.{o} '
2401 'doing powercycle of {s}'.format(o=osd, s=remote.name))
2402 self._assert_ipmi(remote)
2403 remote.console.power_off()
2404 elif self.config.get('bdev_inject_crash') and self.config.get('bdev_inject_crash_probability'):
2405 if random.uniform(0, 1) < self.config.get('bdev_inject_crash_probability', .5):
2406 self.inject_args(
2407 'osd', osd,
2408 'bdev-inject-crash', self.config.get('bdev_inject_crash'))
2409 try:
2410 self.ctx.daemons.get_daemon('osd', osd, self.cluster).wait()
2411 except:
2412 pass
2413 else:
2414 raise RuntimeError('osd.%s did not fail' % osd)
2415 else:
2416 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2417 else:
2418 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2419
2420 @staticmethod
2421 def _assert_ipmi(remote):
2422 assert remote.console.has_ipmi_credentials, (
2423 "powercycling requested but RemoteConsole is not "
2424 "initialized. Check ipmi config.")
2425
2426 def blackhole_kill_osd(self, osd):
2427 """
2428 Stop osd if nothing else works.
2429 """
2430 self.inject_args('osd', osd,
2431 'objectstore-blackhole', True)
2432 time.sleep(2)
2433 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2434
2435 def revive_osd(self, osd, timeout=360, skip_admin_check=False):
2436 """
2437 Revive osds by either power cycling (if indicated by the config)
2438 or by restarting.
2439 """
2440 if self.config.get('powercycle'):
2441 remote = self.find_remote('osd', osd)
2442 self.log('kill_osd on osd.{o} doing powercycle of {s}'.
2443 format(o=osd, s=remote.name))
2444 self._assert_ipmi(remote)
2445 remote.console.power_on()
2446 if not remote.console.check_status(300):
2447 raise Exception('Failed to revive osd.{o} via ipmi'.
2448 format(o=osd))
2449 teuthology.reconnect(self.ctx, 60, [remote])
2450 mount_osd_data(self.ctx, remote, self.cluster, str(osd))
2451 self.make_admin_daemon_dir(remote)
2452 self.ctx.daemons.get_daemon('osd', osd, self.cluster).reset()
2453 self.ctx.daemons.get_daemon('osd', osd, self.cluster).restart()
2454
2455 if not skip_admin_check:
2456 # wait for dump_ops_in_flight; this command doesn't appear
2457 # until after the signal handler is installed and it is safe
2458 # to stop the osd again without making valgrind leak checks
2459 # unhappy. see #5924.
2460 self.wait_run_admin_socket('osd', osd,
2461 args=['dump_ops_in_flight'],
2462 timeout=timeout, stdout=DEVNULL)
2463
2464 def mark_down_osd(self, osd):
2465 """
2466 Cluster command wrapper
2467 """
2468 self.raw_cluster_cmd('osd', 'down', str(osd))
2469
2470 def mark_in_osd(self, osd):
2471 """
2472 Cluster command wrapper
2473 """
2474 self.raw_cluster_cmd('osd', 'in', str(osd))
2475
2476 def signal_osd(self, osd, sig, silent=False):
2477 """
2478 Wrapper to local get_daemon call which sends the given
2479 signal to the given osd.
2480 """
2481 self.ctx.daemons.get_daemon('osd', osd,
2482 self.cluster).signal(sig, silent=silent)
2483
2484 ## monitors
2485 def signal_mon(self, mon, sig, silent=False):
2486 """
2487 Wrapper to local get_daemon call
2488 """
2489 self.ctx.daemons.get_daemon('mon', mon,
2490 self.cluster).signal(sig, silent=silent)
2491
2492 def kill_mon(self, mon):
2493 """
2494 Kill the monitor by either power cycling (if the config says so),
2495 or by doing a stop.
2496 """
2497 if self.config.get('powercycle'):
2498 remote = self.find_remote('mon', mon)
2499 self.log('kill_mon on mon.{m} doing powercycle of {s}'.
2500 format(m=mon, s=remote.name))
2501 self._assert_ipmi(remote)
2502 remote.console.power_off()
2503 else:
2504 self.ctx.daemons.get_daemon('mon', mon, self.cluster).stop()
2505
2506 def revive_mon(self, mon):
2507 """
2508 Restart by either power cycling (if the config says so),
2509 or by doing a normal restart.
2510 """
2511 if self.config.get('powercycle'):
2512 remote = self.find_remote('mon', mon)
2513 self.log('revive_mon on mon.{m} doing powercycle of {s}'.
2514 format(m=mon, s=remote.name))
2515 self._assert_ipmi(remote)
2516 remote.console.power_on()
2517 self.make_admin_daemon_dir(remote)
2518 self.ctx.daemons.get_daemon('mon', mon, self.cluster).restart()
2519
2520 def revive_mgr(self, mgr):
2521 """
2522 Restart by either power cycling (if the config says so),
2523 or by doing a normal restart.
2524 """
2525 if self.config.get('powercycle'):
2526 remote = self.find_remote('mgr', mgr)
2527 self.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
2528 format(m=mgr, s=remote.name))
2529 self._assert_ipmi(remote)
2530 remote.console.power_on()
2531 self.make_admin_daemon_dir(remote)
2532 self.ctx.daemons.get_daemon('mgr', mgr, self.cluster).restart()
2533
2534 def get_mon_status(self, mon):
2535 """
2536 Extract all the monitor status information from the cluster
2537 """
2538 addr = self.ctx.ceph[self.cluster].mons['mon.%s' % mon]
2539 out = self.raw_cluster_cmd('-m', addr, 'mon_status')
2540 return json.loads(out)
2541
2542 def get_mon_quorum(self):
2543 """
2544 Extract monitor quorum information from the cluster
2545 """
2546 out = self.raw_cluster_cmd('quorum_status')
2547 j = json.loads(out)
2548 self.log('quorum_status is %s' % out)
2549 return j['quorum']
2550
2551 def wait_for_mon_quorum_size(self, size, timeout=300):
2552 """
2553 Loop until quorum size is reached.
2554 """
2555 self.log('waiting for quorum size %d' % size)
2556 start = time.time()
2557 while not len(self.get_mon_quorum()) == size:
2558 if timeout is not None:
2559 assert time.time() - start < timeout, \
2560 ('failed to reach quorum size %d '
2561 'before timeout expired' % size)
2562 time.sleep(3)
2563 self.log("quorum is size %d" % size)
2564
2565 def get_mon_health(self, debug=False):
2566 """
2567 Extract all the monitor health information.
2568 """
2569 out = self.raw_cluster_cmd('health', '--format=json')
2570 if debug:
2571 self.log('health:\n{h}'.format(h=out))
2572 return json.loads(out)
2573
2574 def get_filepath(self):
2575 """
2576 Return path to osd data with {id} needing to be replaced
2577 """
2578 return '/var/lib/ceph/osd/' + self.cluster + '-{id}'
2579
2580 def make_admin_daemon_dir(self, remote):
2581 """
2582 Create /var/run/ceph directory on remote site.
2583
2584 :param ctx: Context
2585 :param remote: Remote site
2586 """
2587 remote.run(args=['sudo',
2588 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
2589
2590
2591 def utility_task(name):
2592 """
2593 Generate ceph_manager subtask corresponding to ceph_manager
2594 method name
2595 """
2596 def task(ctx, config):
2597 if config is None:
2598 config = {}
2599 args = config.get('args', [])
2600 kwargs = config.get('kwargs', {})
2601 cluster = config.get('cluster', 'ceph')
2602 fn = getattr(ctx.managers[cluster], name)
2603 fn(*args, **kwargs)
2604 return task
2605
2606 revive_osd = utility_task("revive_osd")
2607 revive_mon = utility_task("revive_mon")
2608 kill_osd = utility_task("kill_osd")
2609 kill_mon = utility_task("kill_mon")
2610 create_pool = utility_task("create_pool")
2611 remove_pool = utility_task("remove_pool")
2612 wait_for_clean = utility_task("wait_for_clean")
2613 flush_all_pg_stats = utility_task("flush_all_pg_stats")
2614 set_pool_property = utility_task("set_pool_property")
2615 do_pg_scrub = utility_task("do_pg_scrub")
2616 wait_for_pool = utility_task("wait_for_pool")
2617 wait_for_pools = utility_task("wait_for_pools")