]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/ceph_manager.py
update sources to v12.1.1
[ceph.git] / ceph / qa / tasks / ceph_manager.py
CommitLineData
7c673cae
FG
1"""
2ceph manager -- Thrasher and CephManager objects
3"""
4from cStringIO import StringIO
5from functools import wraps
6import contextlib
7import random
8import signal
9import time
10import gevent
11import base64
12import json
13import logging
14import threading
15import traceback
16import os
17from teuthology import misc as teuthology
18from tasks.scrub import Scrubber
19from util.rados import cmd_erasure_code_profile
20from util import get_remote
21from teuthology.contextutil import safe_while
22from teuthology.orchestra.remote import Remote
23from teuthology.orchestra import run
24from teuthology.exceptions import CommandFailedError
25
26try:
27 from subprocess import DEVNULL # py3k
28except ImportError:
29 DEVNULL = open(os.devnull, 'r+')
30
31DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
32
33log = logging.getLogger(__name__)
34
35
36def write_conf(ctx, conf_path=DEFAULT_CONF_PATH, cluster='ceph'):
37 conf_fp = StringIO()
38 ctx.ceph[cluster].conf.write(conf_fp)
39 conf_fp.seek(0)
40 writes = ctx.cluster.run(
41 args=[
42 'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
43 'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
44 'sudo', 'python',
45 '-c',
46 ('import shutil, sys; '
47 'shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))'),
48 conf_path,
49 run.Raw('&&'),
50 'sudo', 'chmod', '0644', conf_path,
51 ],
52 stdin=run.PIPE,
53 wait=False)
54 teuthology.feed_many_stdins_and_close(conf_fp, writes)
55 run.wait(writes)
56
57
58def mount_osd_data(ctx, remote, cluster, osd):
59 """
60 Mount a remote OSD
61
62 :param ctx: Context
63 :param remote: Remote site
64 :param cluster: name of ceph cluster
65 :param osd: Osd name
66 """
67 log.debug('Mounting data for osd.{o} on {r}'.format(o=osd, r=remote))
68 role = "{0}.osd.{1}".format(cluster, osd)
69 alt_role = role if cluster != 'ceph' else "osd.{0}".format(osd)
70 if remote in ctx.disk_config.remote_to_roles_to_dev:
71 if alt_role in ctx.disk_config.remote_to_roles_to_dev[remote]:
72 role = alt_role
73 if role not in ctx.disk_config.remote_to_roles_to_dev[remote]:
74 return
75 dev = ctx.disk_config.remote_to_roles_to_dev[remote][role]
76 mount_options = ctx.disk_config.\
77 remote_to_roles_to_dev_mount_options[remote][role]
78 fstype = ctx.disk_config.remote_to_roles_to_dev_fstype[remote][role]
79 mnt = os.path.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster, osd))
80
81 log.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
82 'mountpoint: {p}, type: {t}, options: {v}'.format(
83 o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options,
84 c=cluster))
85
86 remote.run(
87 args=[
88 'sudo',
89 'mount',
90 '-t', fstype,
91 '-o', ','.join(mount_options),
92 dev,
93 mnt,
94 ]
95 )
96
97
98class Thrasher:
99 """
100 Object used to thrash Ceph
101 """
102 def __init__(self, manager, config, logger=None):
103 self.ceph_manager = manager
104 self.cluster = manager.cluster
105 self.ceph_manager.wait_for_clean()
106 osd_status = self.ceph_manager.get_osd_status()
107 self.in_osds = osd_status['in']
108 self.live_osds = osd_status['live']
109 self.out_osds = osd_status['out']
110 self.dead_osds = osd_status['dead']
111 self.stopping = False
112 self.logger = logger
113 self.config = config
114 self.revive_timeout = self.config.get("revive_timeout", 150)
115 self.pools_to_fix_pgp_num = set()
116 if self.config.get('powercycle'):
117 self.revive_timeout += 120
118 self.clean_wait = self.config.get('clean_wait', 0)
119 self.minin = self.config.get("min_in", 3)
120 self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
121 self.sighup_delay = self.config.get('sighup_delay')
122 self.optrack_toggle_delay = self.config.get('optrack_toggle_delay')
123 self.dump_ops_enable = self.config.get('dump_ops_enable')
124 self.noscrub_toggle_delay = self.config.get('noscrub_toggle_delay')
125 self.chance_thrash_cluster_full = self.config.get('chance_thrash_cluster_full', .05)
126 self.chance_thrash_pg_upmap = self.config.get('chance_thrash_pg_upmap', 1.0)
127 self.chance_thrash_pg_upmap_items = self.config.get('chance_thrash_pg_upmap', 1.0)
224ce89b 128 self.random_eio = self.config.get('random_eio')
7c673cae
FG
129
130 num_osds = self.in_osds + self.out_osds
131 self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * num_osds
132 if self.logger is not None:
133 self.log = lambda x: self.logger.info(x)
134 else:
135 def tmp(x):
136 """
137 Implement log behavior
138 """
139 print x
140 self.log = tmp
141 if self.config is None:
142 self.config = dict()
143 # prevent monitor from auto-marking things out while thrasher runs
144 # try both old and new tell syntax, in case we are testing old code
145 self.saved_options = []
146 # assuming that the default settings do not vary from one daemon to
147 # another
148 first_mon = teuthology.get_first_mon(manager.ctx, self.config).split('.')
149 opts = [('mon', 'mon_osd_down_out_interval', 0)]
150 for service, opt, new_value in opts:
151 old_value = manager.get_config(first_mon[0],
152 first_mon[1],
153 opt)
154 self.saved_options.append((service, opt, old_value))
155 self._set_config(service, '*', opt, new_value)
156 # initialize ceph_objectstore_tool property - must be done before
157 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
158 if (self.config.get('powercycle') or
159 not self.cmd_exists_on_osds("ceph-objectstore-tool") or
160 self.config.get('disable_objectstore_tool_tests', False)):
161 self.ceph_objectstore_tool = False
162 self.test_rm_past_intervals = False
163 if self.config.get('powercycle'):
164 self.log("Unable to test ceph-objectstore-tool, "
165 "powercycle testing")
166 else:
167 self.log("Unable to test ceph-objectstore-tool, "
168 "not available on all OSD nodes")
169 else:
170 self.ceph_objectstore_tool = \
171 self.config.get('ceph_objectstore_tool', True)
172 self.test_rm_past_intervals = \
173 self.config.get('test_rm_past_intervals', True)
174 # spawn do_thrash
175 self.thread = gevent.spawn(self.do_thrash)
176 if self.sighup_delay:
177 self.sighup_thread = gevent.spawn(self.do_sighup)
178 if self.optrack_toggle_delay:
179 self.optrack_toggle_thread = gevent.spawn(self.do_optrack_toggle)
180 if self.dump_ops_enable == "true":
181 self.dump_ops_thread = gevent.spawn(self.do_dump_ops)
182 if self.noscrub_toggle_delay:
183 self.noscrub_toggle_thread = gevent.spawn(self.do_noscrub_toggle)
184
185 def _set_config(self, service_type, service_id, name, value):
186 opt_arg = '--{name} {value}'.format(name=name, value=value)
31f18b77
FG
187 whom = '.'.join([service_type, service_id])
188 self.ceph_manager.raw_cluster_cmd('--', 'tell', whom,
189 'injectargs', opt_arg)
7c673cae
FG
190
191
192 def cmd_exists_on_osds(self, cmd):
193 allremotes = self.ceph_manager.ctx.cluster.only(\
194 teuthology.is_type('osd', self.cluster)).remotes.keys()
195 allremotes = list(set(allremotes))
196 for remote in allremotes:
197 proc = remote.run(args=['type', cmd], wait=True,
198 check_status=False, stdout=StringIO(),
199 stderr=StringIO())
200 if proc.exitstatus != 0:
201 return False;
202 return True;
203
204 def kill_osd(self, osd=None, mark_down=False, mark_out=False):
205 """
206 :param osd: Osd to be killed.
207 :mark_down: Mark down if true.
208 :mark_out: Mark out if true.
209 """
210 if osd is None:
211 osd = random.choice(self.live_osds)
212 self.log("Killing osd %s, live_osds are %s" % (str(osd),
213 str(self.live_osds)))
214 self.live_osds.remove(osd)
215 self.dead_osds.append(osd)
216 self.ceph_manager.kill_osd(osd)
217 if mark_down:
218 self.ceph_manager.mark_down_osd(osd)
219 if mark_out and osd in self.in_osds:
220 self.out_osd(osd)
221 if self.ceph_objectstore_tool:
222 self.log("Testing ceph-objectstore-tool on down osd")
223 remote = self.ceph_manager.find_remote('osd', osd)
224 FSPATH = self.ceph_manager.get_filepath()
225 JPATH = os.path.join(FSPATH, "journal")
226 exp_osd = imp_osd = osd
227 exp_remote = imp_remote = remote
228 # If an older osd is available we'll move a pg from there
229 if (len(self.dead_osds) > 1 and
230 random.random() < self.chance_move_pg):
231 exp_osd = random.choice(self.dead_osds[:-1])
232 exp_remote = self.ceph_manager.find_remote('osd', exp_osd)
233 if ('keyvaluestore_backend' in
234 self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
235 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
236 "--data-path {fpath} --journal-path {jpath} "
237 "--type keyvaluestore "
238 "--log-file="
239 "/var/log/ceph/objectstore_tool.\\$pid.log ".
240 format(fpath=FSPATH, jpath=JPATH))
241 else:
242 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
243 "--data-path {fpath} --journal-path {jpath} "
244 "--log-file="
245 "/var/log/ceph/objectstore_tool.\\$pid.log ".
246 format(fpath=FSPATH, jpath=JPATH))
247 cmd = (prefix + "--op list-pgs").format(id=exp_osd)
248
249 # ceph-objectstore-tool might be temporarily absent during an
250 # upgrade - see http://tracker.ceph.com/issues/18014
251 with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
252 while proceed():
253 proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'],
254 wait=True, check_status=False, stdout=StringIO(),
255 stderr=StringIO())
256 if proc.exitstatus == 0:
257 break
258 log.debug("ceph-objectstore-tool binary not present, trying again")
259
260 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
261 # see http://tracker.ceph.com/issues/19556
262 with safe_while(sleep=15, tries=40, action="ceph-objectstore-tool --op list-pgs") as proceed:
263 while proceed():
264 proc = exp_remote.run(args=cmd, wait=True,
265 check_status=False,
266 stdout=StringIO(), stderr=StringIO())
267 if proc.exitstatus == 0:
268 break
269 elif proc.exitstatus == 1 and proc.stderr == "OSD has the store locked":
270 continue
271 else:
272 raise Exception("ceph-objectstore-tool: "
273 "exp list-pgs failure with status {ret}".
274 format(ret=proc.exitstatus))
275
276 pgs = proc.stdout.getvalue().split('\n')[:-1]
277 if len(pgs) == 0:
278 self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
279 return
280 pg = random.choice(pgs)
281 exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
282 exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
283 exp_path = os.path.join(exp_path,
284 "exp.{pg}.{id}".format(
285 pg=pg,
286 id=exp_osd))
287 # export
288 cmd = prefix + "--op export --pgid {pg} --file {file}"
289 cmd = cmd.format(id=exp_osd, pg=pg, file=exp_path)
290 proc = exp_remote.run(args=cmd)
291 if proc.exitstatus:
292 raise Exception("ceph-objectstore-tool: "
293 "export failure with status {ret}".
294 format(ret=proc.exitstatus))
295 # remove
296 cmd = prefix + "--op remove --pgid {pg}"
297 cmd = cmd.format(id=exp_osd, pg=pg)
298 proc = exp_remote.run(args=cmd)
299 if proc.exitstatus:
300 raise Exception("ceph-objectstore-tool: "
301 "remove failure with status {ret}".
302 format(ret=proc.exitstatus))
303 # If there are at least 2 dead osds we might move the pg
304 if exp_osd != imp_osd:
305 # If pg isn't already on this osd, then we will move it there
306 cmd = (prefix + "--op list-pgs").format(id=imp_osd)
307 proc = imp_remote.run(args=cmd, wait=True,
308 check_status=False, stdout=StringIO())
309 if proc.exitstatus:
310 raise Exception("ceph-objectstore-tool: "
311 "imp list-pgs failure with status {ret}".
312 format(ret=proc.exitstatus))
313 pgs = proc.stdout.getvalue().split('\n')[:-1]
314 if pg not in pgs:
315 self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
316 format(pg=pg, fosd=exp_osd, tosd=imp_osd))
317 if imp_remote != exp_remote:
318 # Copy export file to the other machine
319 self.log("Transfer export file from {srem} to {trem}".
320 format(srem=exp_remote, trem=imp_remote))
321 tmpexport = Remote.get_file(exp_remote, exp_path)
322 Remote.put_file(imp_remote, tmpexport, exp_path)
323 os.remove(tmpexport)
324 else:
325 # Can't move the pg after all
326 imp_osd = exp_osd
327 imp_remote = exp_remote
328 # import
329 cmd = (prefix + "--op import --file {file}")
330 cmd = cmd.format(id=imp_osd, file=exp_path)
331 proc = imp_remote.run(args=cmd, wait=True, check_status=False,
332 stderr=StringIO())
333 if proc.exitstatus == 1:
334 bogosity = "The OSD you are using is older than the exported PG"
335 if bogosity in proc.stderr.getvalue():
336 self.log("OSD older than exported PG"
337 "...ignored")
338 elif proc.exitstatus == 10:
339 self.log("Pool went away before processing an import"
340 "...ignored")
341 elif proc.exitstatus == 11:
342 self.log("Attempt to import an incompatible export"
343 "...ignored")
344 elif proc.exitstatus:
345 raise Exception("ceph-objectstore-tool: "
346 "import failure with status {ret}".
347 format(ret=proc.exitstatus))
348 cmd = "rm -f {file}".format(file=exp_path)
349 exp_remote.run(args=cmd)
350 if imp_remote != exp_remote:
351 imp_remote.run(args=cmd)
352
353 # apply low split settings to each pool
354 for pool in self.ceph_manager.list_pools():
355 no_sudo_prefix = prefix[5:]
356 cmd = ("CEPH_ARGS='--filestore-merge-threshold 1 "
357 "--filestore-split-multiple 1' sudo -E "
358 + no_sudo_prefix + "--op apply-layout-settings --pool " + pool).format(id=osd)
359 proc = remote.run(args=cmd, wait=True, check_status=False, stderr=StringIO())
360 output = proc.stderr.getvalue()
361 if 'Couldn\'t find pool' in output:
362 continue
363 if proc.exitstatus:
364 raise Exception("ceph-objectstore-tool apply-layout-settings"
365 " failed with {status}".format(status=proc.exitstatus))
366
367 def rm_past_intervals(self, osd=None):
368 """
369 :param osd: Osd to find pg to remove past intervals
370 """
371 if self.test_rm_past_intervals:
372 if osd is None:
373 osd = random.choice(self.dead_osds)
374 self.log("Use ceph_objectstore_tool to remove past intervals")
375 remote = self.ceph_manager.find_remote('osd', osd)
376 FSPATH = self.ceph_manager.get_filepath()
377 JPATH = os.path.join(FSPATH, "journal")
378 if ('keyvaluestore_backend' in
379 self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
380 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
381 "--data-path {fpath} --journal-path {jpath} "
382 "--type keyvaluestore "
383 "--log-file="
384 "/var/log/ceph/objectstore_tool.\\$pid.log ".
385 format(fpath=FSPATH, jpath=JPATH))
386 else:
387 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
388 "--data-path {fpath} --journal-path {jpath} "
389 "--log-file="
390 "/var/log/ceph/objectstore_tool.\\$pid.log ".
391 format(fpath=FSPATH, jpath=JPATH))
392 cmd = (prefix + "--op list-pgs").format(id=osd)
393 proc = remote.run(args=cmd, wait=True,
394 check_status=False, stdout=StringIO())
395 if proc.exitstatus:
396 raise Exception("ceph_objectstore_tool: "
397 "exp list-pgs failure with status {ret}".
398 format(ret=proc.exitstatus))
399 pgs = proc.stdout.getvalue().split('\n')[:-1]
400 if len(pgs) == 0:
401 self.log("No PGs found for osd.{osd}".format(osd=osd))
402 return
403 pg = random.choice(pgs)
404 cmd = (prefix + "--op rm-past-intervals --pgid {pg}").\
405 format(id=osd, pg=pg)
406 proc = remote.run(args=cmd)
407 if proc.exitstatus:
408 raise Exception("ceph_objectstore_tool: "
409 "rm-past-intervals failure with status {ret}".
410 format(ret=proc.exitstatus))
411
412 def blackhole_kill_osd(self, osd=None):
413 """
414 If all else fails, kill the osd.
415 :param osd: Osd to be killed.
416 """
417 if osd is None:
418 osd = random.choice(self.live_osds)
419 self.log("Blackholing and then killing osd %s, live_osds are %s" %
420 (str(osd), str(self.live_osds)))
421 self.live_osds.remove(osd)
422 self.dead_osds.append(osd)
423 self.ceph_manager.blackhole_kill_osd(osd)
424
425 def revive_osd(self, osd=None, skip_admin_check=False):
426 """
427 Revive the osd.
428 :param osd: Osd to be revived.
429 """
430 if osd is None:
431 osd = random.choice(self.dead_osds)
432 self.log("Reviving osd %s" % (str(osd),))
433 self.ceph_manager.revive_osd(
434 osd,
435 self.revive_timeout,
436 skip_admin_check=skip_admin_check)
437 self.dead_osds.remove(osd)
438 self.live_osds.append(osd)
224ce89b
WB
439 if self.random_eio > 0 and osd is self.rerrosd:
440 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
441 'injectargs', '--', '--filestore_debug_random_read_err='+str(self.random_eio))
442 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
443 'injectargs', '--', '--bluestore_debug_random_read_err='+str(self.random_eio))
444
7c673cae
FG
445
446 def out_osd(self, osd=None):
447 """
448 Mark the osd out
449 :param osd: Osd to be marked.
450 """
451 if osd is None:
452 osd = random.choice(self.in_osds)
453 self.log("Removing osd %s, in_osds are: %s" %
454 (str(osd), str(self.in_osds)))
455 self.ceph_manager.mark_out_osd(osd)
456 self.in_osds.remove(osd)
457 self.out_osds.append(osd)
458
459 def in_osd(self, osd=None):
460 """
461 Mark the osd out
462 :param osd: Osd to be marked.
463 """
464 if osd is None:
465 osd = random.choice(self.out_osds)
466 if osd in self.dead_osds:
467 return self.revive_osd(osd)
468 self.log("Adding osd %s" % (str(osd),))
469 self.out_osds.remove(osd)
470 self.in_osds.append(osd)
471 self.ceph_manager.mark_in_osd(osd)
472 self.log("Added osd %s" % (str(osd),))
473
474 def reweight_osd_or_by_util(self, osd=None):
475 """
476 Reweight an osd that is in
477 :param osd: Osd to be marked.
478 """
479 if osd is not None or random.choice([True, False]):
480 if osd is None:
481 osd = random.choice(self.in_osds)
482 val = random.uniform(.1, 1.0)
483 self.log("Reweighting osd %s to %s" % (str(osd), str(val)))
484 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
485 str(osd), str(val))
486 else:
487 # do it several times, the option space is large
488 for i in range(5):
489 options = {
490 'max_change': random.choice(['0.05', '1.0', '3.0']),
491 'overage': random.choice(['110', '1000']),
492 'type': random.choice([
493 'reweight-by-utilization',
494 'test-reweight-by-utilization']),
495 }
496 self.log("Reweighting by: %s"%(str(options),))
497 self.ceph_manager.raw_cluster_cmd(
498 'osd',
499 options['type'],
500 options['overage'],
501 options['max_change'])
502
503 def primary_affinity(self, osd=None):
504 if osd is None:
505 osd = random.choice(self.in_osds)
506 if random.random() >= .5:
507 pa = random.random()
508 elif random.random() >= .5:
509 pa = 1
510 else:
511 pa = 0
512 self.log('Setting osd %s primary_affinity to %f' % (str(osd), pa))
513 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
514 str(osd), str(pa))
515
516 def thrash_cluster_full(self):
517 """
518 Set and unset cluster full condition
519 """
520 self.log('Setting full ratio to .001')
521 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
522 time.sleep(1)
523 self.log('Setting full ratio back to .95')
524 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
525
526 def thrash_pg_upmap(self):
527 """
528 Install or remove random pg_upmap entries in OSDMap
529 """
530 from random import shuffle
531 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
532 j = json.loads(out)
533 self.log('j is %s' % j)
534 try:
535 if random.random() >= .3:
536 pgs = self.ceph_manager.get_pg_stats()
537 pg = random.choice(pgs)
538 pgid = str(pg['pgid'])
539 poolid = int(pgid.split('.')[0])
540 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
541 if len(sizes) == 0:
542 return
543 n = sizes[0]
544 osds = self.in_osds + self.out_osds
545 shuffle(osds)
546 osds = osds[0:n]
547 self.log('Setting %s to %s' % (pgid, osds))
548 cmd = ['osd', 'pg-upmap', pgid] + [str(x) for x in osds]
549 self.log('cmd %s' % cmd)
550 self.ceph_manager.raw_cluster_cmd(*cmd)
551 else:
552 m = j['pg_upmap']
553 if len(m) > 0:
554 shuffle(m)
555 pg = m[0]['pgid']
556 self.log('Clearing pg_upmap on %s' % pg)
557 self.ceph_manager.raw_cluster_cmd(
558 'osd',
559 'rm-pg-upmap',
560 pg)
561 else:
562 self.log('No pg_upmap entries; doing nothing')
563 except CommandFailedError:
564 self.log('Failed to rm-pg-upmap, ignoring')
565
566 def thrash_pg_upmap_items(self):
567 """
568 Install or remove random pg_upmap_items entries in OSDMap
569 """
570 from random import shuffle
571 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
572 j = json.loads(out)
573 self.log('j is %s' % j)
574 try:
575 if random.random() >= .3:
576 pgs = self.ceph_manager.get_pg_stats()
577 pg = random.choice(pgs)
578 pgid = str(pg['pgid'])
579 poolid = int(pgid.split('.')[0])
580 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
581 if len(sizes) == 0:
582 return
583 n = sizes[0]
584 osds = self.in_osds + self.out_osds
585 shuffle(osds)
586 osds = osds[0:n*2]
587 self.log('Setting %s to %s' % (pgid, osds))
588 cmd = ['osd', 'pg-upmap-items', pgid] + [str(x) for x in osds]
589 self.log('cmd %s' % cmd)
590 self.ceph_manager.raw_cluster_cmd(*cmd)
591 else:
592 m = j['pg_upmap_items']
593 if len(m) > 0:
594 shuffle(m)
595 pg = m[0]['pgid']
596 self.log('Clearing pg_upmap on %s' % pg)
597 self.ceph_manager.raw_cluster_cmd(
598 'osd',
599 'rm-pg-upmap-items',
600 pg)
601 else:
602 self.log('No pg_upmap entries; doing nothing')
603 except CommandFailedError:
604 self.log('Failed to rm-pg-upmap-items, ignoring')
605
606 def all_up(self):
607 """
608 Make sure all osds are up and not out.
609 """
610 while len(self.dead_osds) > 0:
611 self.log("reviving osd")
612 self.revive_osd()
613 while len(self.out_osds) > 0:
614 self.log("inning osd")
615 self.in_osd()
616
31f18b77
FG
617 def all_up_in(self):
618 """
619 Make sure all osds are up and fully in.
620 """
621 self.all_up();
622 for osd in self.live_osds:
623 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
624 str(osd), str(1))
625 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
626 str(osd), str(1))
627
7c673cae
FG
628 def do_join(self):
629 """
630 Break out of this Ceph loop
631 """
632 self.stopping = True
633 self.thread.get()
634 if self.sighup_delay:
635 self.log("joining the do_sighup greenlet")
636 self.sighup_thread.get()
637 if self.optrack_toggle_delay:
638 self.log("joining the do_optrack_toggle greenlet")
639 self.optrack_toggle_thread.join()
640 if self.dump_ops_enable == "true":
641 self.log("joining the do_dump_ops greenlet")
642 self.dump_ops_thread.join()
643 if self.noscrub_toggle_delay:
644 self.log("joining the do_noscrub_toggle greenlet")
645 self.noscrub_toggle_thread.join()
646
647 def grow_pool(self):
648 """
649 Increase the size of the pool
650 """
651 pool = self.ceph_manager.get_pool()
652 orig_pg_num = self.ceph_manager.get_pool_pg_num(pool)
653 self.log("Growing pool %s" % (pool,))
654 if self.ceph_manager.expand_pool(pool,
655 self.config.get('pool_grow_by', 10),
656 self.max_pgs):
657 self.pools_to_fix_pgp_num.add(pool)
658
659 def fix_pgp_num(self, pool=None):
660 """
661 Fix number of pgs in pool.
662 """
663 if pool is None:
664 pool = self.ceph_manager.get_pool()
665 force = False
666 else:
667 force = True
668 self.log("fixing pg num pool %s" % (pool,))
669 if self.ceph_manager.set_pool_pgpnum(pool, force):
670 self.pools_to_fix_pgp_num.discard(pool)
671
672 def test_pool_min_size(self):
673 """
674 Kill and revive all osds except one.
675 """
676 self.log("test_pool_min_size")
677 self.all_up()
678 self.ceph_manager.wait_for_recovery(
679 timeout=self.config.get('timeout')
680 )
681 the_one = random.choice(self.in_osds)
682 self.log("Killing everyone but %s", the_one)
683 to_kill = filter(lambda x: x != the_one, self.in_osds)
684 [self.kill_osd(i) for i in to_kill]
685 [self.out_osd(i) for i in to_kill]
686 time.sleep(self.config.get("test_pool_min_size_time", 10))
687 self.log("Killing %s" % (the_one,))
688 self.kill_osd(the_one)
689 self.out_osd(the_one)
690 self.log("Reviving everyone but %s" % (the_one,))
691 [self.revive_osd(i) for i in to_kill]
692 [self.in_osd(i) for i in to_kill]
693 self.log("Revived everyone but %s" % (the_one,))
694 self.log("Waiting for clean")
695 self.ceph_manager.wait_for_recovery(
696 timeout=self.config.get('timeout')
697 )
698
699 def inject_pause(self, conf_key, duration, check_after, should_be_down):
700 """
701 Pause injection testing. Check for osd being down when finished.
702 """
703 the_one = random.choice(self.live_osds)
704 self.log("inject_pause on {osd}".format(osd=the_one))
705 self.log(
706 "Testing {key} pause injection for duration {duration}".format(
707 key=conf_key,
708 duration=duration
709 ))
710 self.log(
711 "Checking after {after}, should_be_down={shouldbedown}".format(
712 after=check_after,
713 shouldbedown=should_be_down
714 ))
715 self.ceph_manager.set_config(the_one, **{conf_key: duration})
716 if not should_be_down:
717 return
718 time.sleep(check_after)
719 status = self.ceph_manager.get_osd_status()
720 assert the_one in status['down']
721 time.sleep(duration - check_after + 20)
722 status = self.ceph_manager.get_osd_status()
723 assert not the_one in status['down']
724
725 def test_backfill_full(self):
726 """
727 Test backfills stopping when the replica fills up.
728
729 First, use injectfull admin command to simulate a now full
730 osd by setting it to 0 on all of the OSDs.
731
732 Second, on a random subset, set
733 osd_debug_skip_full_check_in_backfill_reservation to force
734 the more complicated check in do_scan to be exercised.
735
736 Then, verify that all backfills stop.
737 """
738 self.log("injecting backfill full")
739 for i in self.live_osds:
740 self.ceph_manager.set_config(
741 i,
742 osd_debug_skip_full_check_in_backfill_reservation=
743 random.choice(['false', 'true']))
744 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'backfillfull'],
745 check_status=True, timeout=30, stdout=DEVNULL)
746 for i in range(30):
747 status = self.ceph_manager.compile_pg_status()
748 if 'backfill' not in status.keys():
749 break
750 self.log(
751 "waiting for {still_going} backfills".format(
752 still_going=status.get('backfill')))
753 time.sleep(1)
754 assert('backfill' not in self.ceph_manager.compile_pg_status().keys())
755 for i in self.live_osds:
756 self.ceph_manager.set_config(
757 i,
758 osd_debug_skip_full_check_in_backfill_reservation='false')
759 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'none'],
760 check_status=True, timeout=30, stdout=DEVNULL)
761
762 def test_map_discontinuity(self):
763 """
764 1) Allows the osds to recover
765 2) kills an osd
766 3) allows the remaining osds to recover
767 4) waits for some time
768 5) revives the osd
769 This sequence should cause the revived osd to have to handle
770 a map gap since the mons would have trimmed
771 """
772 while len(self.in_osds) < (self.minin + 1):
773 self.in_osd()
774 self.log("Waiting for recovery")
775 self.ceph_manager.wait_for_all_up(
776 timeout=self.config.get('timeout')
777 )
778 # now we wait 20s for the pg status to change, if it takes longer,
779 # the test *should* fail!
780 time.sleep(20)
781 self.ceph_manager.wait_for_clean(
782 timeout=self.config.get('timeout')
783 )
784
785 # now we wait 20s for the backfill replicas to hear about the clean
786 time.sleep(20)
787 self.log("Recovered, killing an osd")
788 self.kill_osd(mark_down=True, mark_out=True)
789 self.log("Waiting for clean again")
790 self.ceph_manager.wait_for_clean(
791 timeout=self.config.get('timeout')
792 )
793 self.log("Waiting for trim")
794 time.sleep(int(self.config.get("map_discontinuity_sleep_time", 40)))
795 self.revive_osd()
796
797 def choose_action(self):
798 """
799 Random action selector.
800 """
801 chance_down = self.config.get('chance_down', 0.4)
802 chance_test_min_size = self.config.get('chance_test_min_size', 0)
803 chance_test_backfill_full = \
804 self.config.get('chance_test_backfill_full', 0)
805 if isinstance(chance_down, int):
806 chance_down = float(chance_down) / 100
807 minin = self.minin
808 minout = self.config.get("min_out", 0)
809 minlive = self.config.get("min_live", 2)
810 mindead = self.config.get("min_dead", 0)
811
812 self.log('choose_action: min_in %d min_out '
813 '%d min_live %d min_dead %d' %
814 (minin, minout, minlive, mindead))
815 actions = []
816 if len(self.in_osds) > minin:
817 actions.append((self.out_osd, 1.0,))
818 if len(self.live_osds) > minlive and chance_down > 0:
819 actions.append((self.kill_osd, chance_down,))
820 if len(self.dead_osds) > 1:
821 actions.append((self.rm_past_intervals, 1.0,))
822 if len(self.out_osds) > minout:
823 actions.append((self.in_osd, 1.7,))
824 if len(self.dead_osds) > mindead:
825 actions.append((self.revive_osd, 1.0,))
826 if self.config.get('thrash_primary_affinity', True):
827 actions.append((self.primary_affinity, 1.0,))
828 actions.append((self.reweight_osd_or_by_util,
829 self.config.get('reweight_osd', .5),))
830 actions.append((self.grow_pool,
831 self.config.get('chance_pgnum_grow', 0),))
832 actions.append((self.fix_pgp_num,
833 self.config.get('chance_pgpnum_fix', 0),))
834 actions.append((self.test_pool_min_size,
835 chance_test_min_size,))
836 actions.append((self.test_backfill_full,
837 chance_test_backfill_full,))
838 if self.chance_thrash_cluster_full > 0:
839 actions.append((self.thrash_cluster_full, self.chance_thrash_cluster_full,))
840 if self.chance_thrash_pg_upmap > 0:
841 actions.append((self.thrash_pg_upmap, self.chance_thrash_pg_upmap,))
842 if self.chance_thrash_pg_upmap_items > 0:
843 actions.append((self.thrash_pg_upmap_items, self.chance_thrash_pg_upmap_items,))
844
845 for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
846 for scenario in [
847 (lambda:
848 self.inject_pause(key,
849 self.config.get('pause_short', 3),
850 0,
851 False),
852 self.config.get('chance_inject_pause_short', 1),),
853 (lambda:
854 self.inject_pause(key,
855 self.config.get('pause_long', 80),
856 self.config.get('pause_check_after', 70),
857 True),
858 self.config.get('chance_inject_pause_long', 0),)]:
859 actions.append(scenario)
860
861 total = sum([y for (x, y) in actions])
862 val = random.uniform(0, total)
863 for (action, prob) in actions:
864 if val < prob:
865 return action
866 val -= prob
867 return None
868
869 def log_exc(func):
870 @wraps(func)
871 def wrapper(self):
872 try:
873 return func(self)
874 except:
875 self.log(traceback.format_exc())
876 raise
877 return wrapper
878
879 @log_exc
880 def do_sighup(self):
881 """
882 Loops and sends signal.SIGHUP to a random live osd.
883
884 Loop delay is controlled by the config value sighup_delay.
885 """
886 delay = float(self.sighup_delay)
887 self.log("starting do_sighup with a delay of {0}".format(delay))
888 while not self.stopping:
889 osd = random.choice(self.live_osds)
890 self.ceph_manager.signal_osd(osd, signal.SIGHUP, silent=True)
891 time.sleep(delay)
892
893 @log_exc
894 def do_optrack_toggle(self):
895 """
896 Loops and toggle op tracking to all osds.
897
898 Loop delay is controlled by the config value optrack_toggle_delay.
899 """
900 delay = float(self.optrack_toggle_delay)
901 osd_state = "true"
902 self.log("starting do_optrack_toggle with a delay of {0}".format(delay))
903 while not self.stopping:
904 if osd_state == "true":
905 osd_state = "false"
906 else:
907 osd_state = "true"
908 self.ceph_manager.raw_cluster_cmd_result('tell', 'osd.*',
909 'injectargs', '--osd_enable_op_tracker=%s' % osd_state)
910 gevent.sleep(delay)
911
912 @log_exc
913 def do_dump_ops(self):
914 """
915 Loops and does op dumps on all osds
916 """
917 self.log("starting do_dump_ops")
918 while not self.stopping:
919 for osd in self.live_osds:
920 # Ignore errors because live_osds is in flux
921 self.ceph_manager.osd_admin_socket(osd, command=['dump_ops_in_flight'],
922 check_status=False, timeout=30, stdout=DEVNULL)
923 self.ceph_manager.osd_admin_socket(osd, command=['dump_blocked_ops'],
924 check_status=False, timeout=30, stdout=DEVNULL)
925 self.ceph_manager.osd_admin_socket(osd, command=['dump_historic_ops'],
926 check_status=False, timeout=30, stdout=DEVNULL)
927 gevent.sleep(0)
928
929 @log_exc
930 def do_noscrub_toggle(self):
931 """
932 Loops and toggle noscrub flags
933
934 Loop delay is controlled by the config value noscrub_toggle_delay.
935 """
936 delay = float(self.noscrub_toggle_delay)
937 scrub_state = "none"
938 self.log("starting do_noscrub_toggle with a delay of {0}".format(delay))
939 while not self.stopping:
940 if scrub_state == "none":
941 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'noscrub')
942 scrub_state = "noscrub"
943 elif scrub_state == "noscrub":
944 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
945 scrub_state = "both"
946 elif scrub_state == "both":
947 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
948 scrub_state = "nodeep-scrub"
949 else:
950 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
951 scrub_state = "none"
952 gevent.sleep(delay)
953 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
954 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
955
956 @log_exc
957 def do_thrash(self):
958 """
959 Loop to select random actions to thrash ceph manager with.
960 """
961 cleanint = self.config.get("clean_interval", 60)
962 scrubint = self.config.get("scrub_interval", -1)
963 maxdead = self.config.get("max_dead", 0)
964 delay = self.config.get("op_delay", 5)
224ce89b
WB
965 self.rerrosd = self.live_osds[0]
966 if self.random_eio > 0:
967 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
968 'injectargs', '--', '--filestore_debug_random_read_err='+str(self.random_eio))
969 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
970 'injectargs', '--', '--bluestore_debug_random_read_err='+str(self.random_eio))
7c673cae
FG
971 self.log("starting do_thrash")
972 while not self.stopping:
973 to_log = [str(x) for x in ["in_osds: ", self.in_osds,
974 "out_osds: ", self.out_osds,
975 "dead_osds: ", self.dead_osds,
976 "live_osds: ", self.live_osds]]
977 self.log(" ".join(to_log))
978 if random.uniform(0, 1) < (float(delay) / cleanint):
979 while len(self.dead_osds) > maxdead:
980 self.revive_osd()
981 for osd in self.in_osds:
982 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
983 str(osd), str(1))
984 if random.uniform(0, 1) < float(
985 self.config.get('chance_test_map_discontinuity', 0)):
986 self.test_map_discontinuity()
987 else:
988 self.ceph_manager.wait_for_recovery(
989 timeout=self.config.get('timeout')
990 )
991 time.sleep(self.clean_wait)
992 if scrubint > 0:
993 if random.uniform(0, 1) < (float(delay) / scrubint):
994 self.log('Scrubbing while thrashing being performed')
995 Scrubber(self.ceph_manager, self.config)
996 self.choose_action()()
997 time.sleep(delay)
224ce89b
WB
998 if self.random_eio > 0:
999 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1000 'injectargs', '--', '--filestore_debug_random_read_err=0.0')
1001 self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
1002 'injectargs', '--', '--bluestore_debug_random_read_err=0.0')
7c673cae
FG
1003 for pool in list(self.pools_to_fix_pgp_num):
1004 if self.ceph_manager.get_pool_pg_num(pool) > 0:
1005 self.fix_pgp_num(pool)
1006 self.pools_to_fix_pgp_num.clear()
1007 for service, opt, saved_value in self.saved_options:
1008 self._set_config(service, '*', opt, saved_value)
1009 self.saved_options = []
31f18b77 1010 self.all_up_in()
7c673cae
FG
1011
1012
1013class ObjectStoreTool:
1014
1015 def __init__(self, manager, pool, **kwargs):
1016 self.manager = manager
1017 self.pool = pool
1018 self.osd = kwargs.get('osd', None)
1019 self.object_name = kwargs.get('object_name', None)
1020 self.do_revive = kwargs.get('do_revive', True)
1021 if self.osd and self.pool and self.object_name:
1022 if self.osd == "primary":
1023 self.osd = self.manager.get_object_primary(self.pool,
1024 self.object_name)
1025 assert self.osd
1026 if self.object_name:
1027 self.pgid = self.manager.get_object_pg_with_shard(self.pool,
1028 self.object_name,
1029 self.osd)
1030 self.remote = self.manager.ctx.\
1031 cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()[0]
1032 path = self.manager.get_filepath().format(id=self.osd)
1033 self.paths = ("--data-path {path} --journal-path {path}/journal".
1034 format(path=path))
1035
1036 def build_cmd(self, options, args, stdin):
1037 lines = []
1038 if self.object_name:
1039 lines.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1040 "{paths} --pgid {pgid} --op list |"
1041 "grep '\"oid\":\"{name}\"')".
1042 format(paths=self.paths,
1043 pgid=self.pgid,
1044 name=self.object_name))
1045 args = '"$object" ' + args
1046 options += " --pgid {pgid}".format(pgid=self.pgid)
1047 cmd = ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1048 format(paths=self.paths,
1049 args=args,
1050 options=options))
1051 if stdin:
1052 cmd = ("echo {payload} | base64 --decode | {cmd}".
1053 format(payload=base64.encode(stdin),
1054 cmd=cmd))
1055 lines.append(cmd)
1056 return "\n".join(lines)
1057
1058 def run(self, options, args, stdin=None, stdout=None):
1059 if stdout is None:
1060 stdout = StringIO()
1061 self.manager.kill_osd(self.osd)
1062 cmd = self.build_cmd(options, args, stdin)
1063 self.manager.log(cmd)
1064 try:
1065 proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
1066 check_status=False,
1067 stdout=stdout,
1068 stderr=StringIO())
1069 proc.wait()
1070 if proc.exitstatus != 0:
1071 self.manager.log("failed with " + str(proc.exitstatus))
1072 error = proc.stdout.getvalue() + " " + proc.stderr.getvalue()
1073 raise Exception(error)
1074 finally:
1075 if self.do_revive:
1076 self.manager.revive_osd(self.osd)
1077
1078
1079class CephManager:
1080 """
1081 Ceph manager object.
1082 Contains several local functions that form a bulk of this module.
1083
1084 Note: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1085 the same name.
1086 """
1087
1088 REPLICATED_POOL = 1
1089 ERASURE_CODED_POOL = 3
1090
1091 def __init__(self, controller, ctx=None, config=None, logger=None,
1092 cluster='ceph'):
1093 self.lock = threading.RLock()
1094 self.ctx = ctx
1095 self.config = config
1096 self.controller = controller
1097 self.next_pool_id = 0
1098 self.cluster = cluster
1099 if (logger):
1100 self.log = lambda x: logger.info(x)
1101 else:
1102 def tmp(x):
1103 """
1104 implement log behavior.
1105 """
1106 print x
1107 self.log = tmp
1108 if self.config is None:
1109 self.config = dict()
1110 pools = self.list_pools()
1111 self.pools = {}
1112 for pool in pools:
1113 # we may race with a pool deletion; ignore failures here
1114 try:
1115 self.pools[pool] = self.get_pool_property(pool, 'pg_num')
1116 except CommandFailedError:
1117 self.log('Failed to get pg_num from pool %s, ignoring' % pool)
1118
1119 def raw_cluster_cmd(self, *args):
1120 """
1121 Start ceph on a raw cluster. Return count
1122 """
1123 testdir = teuthology.get_testdir(self.ctx)
1124 ceph_args = [
1125 'sudo',
1126 'adjust-ulimits',
1127 'ceph-coverage',
1128 '{tdir}/archive/coverage'.format(tdir=testdir),
1129 'timeout',
1130 '120',
1131 'ceph',
1132 '--cluster',
1133 self.cluster,
1134 ]
1135 ceph_args.extend(args)
1136 proc = self.controller.run(
1137 args=ceph_args,
1138 stdout=StringIO(),
1139 )
1140 return proc.stdout.getvalue()
1141
1142 def raw_cluster_cmd_result(self, *args):
1143 """
1144 Start ceph on a cluster. Return success or failure information.
1145 """
1146 testdir = teuthology.get_testdir(self.ctx)
1147 ceph_args = [
1148 'sudo',
1149 'adjust-ulimits',
1150 'ceph-coverage',
1151 '{tdir}/archive/coverage'.format(tdir=testdir),
1152 'timeout',
1153 '120',
1154 'ceph',
1155 '--cluster',
1156 self.cluster,
1157 ]
1158 ceph_args.extend(args)
1159 proc = self.controller.run(
1160 args=ceph_args,
1161 check_status=False,
1162 )
1163 return proc.exitstatus
1164
1165 def run_ceph_w(self):
1166 """
1167 Execute "ceph -w" in the background with stdout connected to a StringIO,
1168 and return the RemoteProcess.
1169 """
1170 return self.controller.run(
1171 args=["sudo",
1172 "daemon-helper",
1173 "kill",
1174 "ceph",
1175 '--cluster',
1176 self.cluster,
1177 "-w"],
1178 wait=False, stdout=StringIO(), stdin=run.PIPE)
1179
224ce89b 1180 def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
31f18b77
FG
1181 """
1182 Flush pg stats from a list of OSD ids, ensuring they are reflected
1183 all the way to the monitor. Luminous and later only.
1184
1185 :param osds: list of OSDs to flush
1186 :param no_wait: list of OSDs not to wait for seq id. by default, we
1187 wait for all specified osds, but some of them could be
1188 moved out of osdmap, so we cannot get their updated
1189 stat seq from monitor anymore. in that case, you need
1190 to pass a blacklist.
1191 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
224ce89b 1192 it. (5 min by default)
31f18b77
FG
1193 """
1194 seq = {osd: self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats')
1195 for osd in osds}
1196 if not wait_for_mon:
1197 return
1198 if no_wait is None:
1199 no_wait = []
1200 for osd, need in seq.iteritems():
1201 if osd in no_wait:
1202 continue
1203 got = 0
1204 while wait_for_mon > 0:
1205 got = self.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd)
1206 self.log('need seq {need} got {got} for osd.{osd}'.format(
1207 need=need, got=got, osd=osd))
1208 if got >= need:
1209 break
1210 A_WHILE = 1
1211 time.sleep(A_WHILE)
1212 wait_for_mon -= A_WHILE
1213 else:
1214 raise Exception('timed out waiting for mon to be updated with '
1215 'osd.{osd}: {got} < {need}'.
1216 format(osd=osd, got=got, need=need))
1217
1218 def flush_all_pg_stats(self):
1219 self.flush_pg_stats(range(len(self.get_osd_dump())))
1220
7c673cae
FG
1221 def do_rados(self, remote, cmd, check_status=True):
1222 """
1223 Execute a remote rados command.
1224 """
1225 testdir = teuthology.get_testdir(self.ctx)
1226 pre = [
1227 'adjust-ulimits',
1228 'ceph-coverage',
1229 '{tdir}/archive/coverage'.format(tdir=testdir),
1230 'rados',
1231 '--cluster',
1232 self.cluster,
1233 ]
1234 pre.extend(cmd)
1235 proc = remote.run(
1236 args=pre,
1237 wait=True,
1238 check_status=check_status
1239 )
1240 return proc
1241
1242 def rados_write_objects(self, pool, num_objects, size,
1243 timelimit, threads, cleanup=False):
1244 """
1245 Write rados objects
1246 Threads not used yet.
1247 """
1248 args = [
1249 '-p', pool,
1250 '--num-objects', num_objects,
1251 '-b', size,
1252 'bench', timelimit,
1253 'write'
1254 ]
1255 if not cleanup:
1256 args.append('--no-cleanup')
1257 return self.do_rados(self.controller, map(str, args))
1258
1259 def do_put(self, pool, obj, fname, namespace=None):
1260 """
1261 Implement rados put operation
1262 """
1263 args = ['-p', pool]
1264 if namespace is not None:
1265 args += ['-N', namespace]
1266 args += [
1267 'put',
1268 obj,
1269 fname
1270 ]
1271 return self.do_rados(
1272 self.controller,
1273 args,
1274 check_status=False
1275 ).exitstatus
1276
1277 def do_get(self, pool, obj, fname='/dev/null', namespace=None):
1278 """
1279 Implement rados get operation
1280 """
1281 args = ['-p', pool]
1282 if namespace is not None:
1283 args += ['-N', namespace]
1284 args += [
1285 'get',
1286 obj,
1287 fname
1288 ]
1289 return self.do_rados(
1290 self.controller,
1291 args,
1292 check_status=False
1293 ).exitstatus
1294
1295 def do_rm(self, pool, obj, namespace=None):
1296 """
1297 Implement rados rm operation
1298 """
1299 args = ['-p', pool]
1300 if namespace is not None:
1301 args += ['-N', namespace]
1302 args += [
1303 'rm',
1304 obj
1305 ]
1306 return self.do_rados(
1307 self.controller,
1308 args,
1309 check_status=False
1310 ).exitstatus
1311
1312 def osd_admin_socket(self, osd_id, command, check_status=True, timeout=0, stdout=None):
1313 if stdout is None:
1314 stdout = StringIO()
1315 return self.admin_socket('osd', osd_id, command, check_status, timeout, stdout)
1316
1317 def find_remote(self, service_type, service_id):
1318 """
1319 Get the Remote for the host where a particular service runs.
1320
1321 :param service_type: 'mds', 'osd', 'client'
1322 :param service_id: The second part of a role, e.g. '0' for
1323 the role 'client.0'
1324 :return: a Remote instance for the host where the
1325 requested role is placed
1326 """
1327 return get_remote(self.ctx, self.cluster,
1328 service_type, service_id)
1329
1330 def admin_socket(self, service_type, service_id,
1331 command, check_status=True, timeout=0, stdout=None):
1332 """
1333 Remotely start up ceph specifying the admin socket
1334 :param command: a list of words to use as the command
1335 to the admin socket
1336 """
1337 if stdout is None:
1338 stdout = StringIO()
1339 testdir = teuthology.get_testdir(self.ctx)
1340 remote = self.find_remote(service_type, service_id)
1341 args = [
1342 'sudo',
1343 'adjust-ulimits',
1344 'ceph-coverage',
1345 '{tdir}/archive/coverage'.format(tdir=testdir),
1346 'timeout',
1347 str(timeout),
1348 'ceph',
1349 '--cluster',
1350 self.cluster,
1351 '--admin-daemon',
1352 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1353 cluster=self.cluster,
1354 type=service_type,
1355 id=service_id),
1356 ]
1357 args.extend(command)
1358 return remote.run(
1359 args=args,
1360 stdout=stdout,
1361 wait=True,
1362 check_status=check_status
1363 )
1364
1365 def objectstore_tool(self, pool, options, args, **kwargs):
1366 return ObjectStoreTool(self, pool, **kwargs).run(options, args)
1367
1368 def get_pgid(self, pool, pgnum):
1369 """
1370 :param pool: pool name
1371 :param pgnum: pg number
1372 :returns: a string representing this pg.
1373 """
1374 poolnum = self.get_pool_num(pool)
1375 pg_str = "{poolnum}.{pgnum}".format(
1376 poolnum=poolnum,
1377 pgnum=pgnum)
1378 return pg_str
1379
1380 def get_pg_replica(self, pool, pgnum):
1381 """
1382 get replica for pool, pgnum (e.g. (data, 0)->0
1383 """
1384 pg_str = self.get_pgid(pool, pgnum)
1385 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1386 j = json.loads('\n'.join(output.split('\n')[1:]))
1387 return int(j['acting'][-1])
1388 assert False
1389
1390 def wait_for_pg_stats(func):
1391 # both osd_mon_report_interval_min and mgr_stats_period are 5 seconds
1392 # by default, and take the faulty injection in ms into consideration,
1393 # 12 seconds are more than enough
1394 delays = [1, 1, 2, 3, 5, 8, 13]
1395 @wraps(func)
1396 def wrapper(self, *args, **kwargs):
1397 exc = None
1398 for delay in delays:
1399 try:
1400 return func(self, *args, **kwargs)
1401 except AssertionError as e:
1402 time.sleep(delay)
1403 exc = e
1404 raise exc
1405 return wrapper
1406
1407 def get_pg_primary(self, pool, pgnum):
1408 """
1409 get primary for pool, pgnum (e.g. (data, 0)->0
1410 """
1411 pg_str = self.get_pgid(pool, pgnum)
1412 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1413 j = json.loads('\n'.join(output.split('\n')[1:]))
1414 return int(j['acting'][0])
1415 assert False
1416
1417 def get_pool_num(self, pool):
1418 """
1419 get number for pool (e.g., data -> 2)
1420 """
1421 return int(self.get_pool_dump(pool)['pool'])
1422
1423 def list_pools(self):
1424 """
1425 list all pool names
1426 """
1427 osd_dump = self.get_osd_dump_json()
1428 self.log(osd_dump['pools'])
1429 return [str(i['pool_name']) for i in osd_dump['pools']]
1430
1431 def clear_pools(self):
1432 """
1433 remove all pools
1434 """
1435 [self.remove_pool(i) for i in self.list_pools()]
1436
1437 def kick_recovery_wq(self, osdnum):
1438 """
1439 Run kick_recovery_wq on cluster.
1440 """
1441 return self.raw_cluster_cmd(
1442 'tell', "osd.%d" % (int(osdnum),),
1443 'debug',
1444 'kick_recovery_wq',
1445 '0')
1446
1447 def wait_run_admin_socket(self, service_type,
1448 service_id, args=['version'], timeout=75, stdout=None):
1449 """
1450 If osd_admin_socket call suceeds, return. Otherwise wait
1451 five seconds and try again.
1452 """
1453 if stdout is None:
1454 stdout = StringIO()
1455 tries = 0
1456 while True:
1457 proc = self.admin_socket(service_type, service_id,
1458 args, check_status=False, stdout=stdout)
1459 if proc.exitstatus is 0:
1460 return proc
1461 else:
1462 tries += 1
1463 if (tries * 5) > timeout:
1464 raise Exception('timed out waiting for admin_socket '
1465 'to appear after {type}.{id} restart'.
1466 format(type=service_type,
1467 id=service_id))
1468 self.log("waiting on admin_socket for {type}-{id}, "
1469 "{command}".format(type=service_type,
1470 id=service_id,
1471 command=args))
1472 time.sleep(5)
1473
1474 def get_pool_dump(self, pool):
1475 """
1476 get the osd dump part of a pool
1477 """
1478 osd_dump = self.get_osd_dump_json()
1479 for i in osd_dump['pools']:
1480 if i['pool_name'] == pool:
1481 return i
1482 assert False
1483
1484 def get_config(self, service_type, service_id, name):
1485 """
1486 :param node: like 'mon.a'
1487 :param name: the option name
1488 """
1489 proc = self.wait_run_admin_socket(service_type, service_id,
1490 ['config', 'show'])
1491 j = json.loads(proc.stdout.getvalue())
1492 return j[name]
1493
1494 def set_config(self, osdnum, **argdict):
1495 """
1496 :param osdnum: osd number
1497 :param argdict: dictionary containing values to set.
1498 """
1499 for k, v in argdict.iteritems():
1500 self.wait_run_admin_socket(
1501 'osd', osdnum,
1502 ['config', 'set', str(k), str(v)])
1503
1504 def raw_cluster_status(self):
1505 """
1506 Get status from cluster
1507 """
1508 status = self.raw_cluster_cmd('status', '--format=json-pretty')
1509 return json.loads(status)
1510
1511 def raw_osd_status(self):
1512 """
1513 Get osd status from cluster
1514 """
1515 return self.raw_cluster_cmd('osd', 'dump')
1516
1517 def get_osd_status(self):
1518 """
1519 Get osd statuses sorted by states that the osds are in.
1520 """
1521 osd_lines = filter(
1522 lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
1523 self.raw_osd_status().split('\n'))
1524 self.log(osd_lines)
1525 in_osds = [int(i[4:].split()[0])
1526 for i in filter(lambda x: " in " in x, osd_lines)]
1527 out_osds = [int(i[4:].split()[0])
1528 for i in filter(lambda x: " out " in x, osd_lines)]
1529 up_osds = [int(i[4:].split()[0])
1530 for i in filter(lambda x: " up " in x, osd_lines)]
1531 down_osds = [int(i[4:].split()[0])
1532 for i in filter(lambda x: " down " in x, osd_lines)]
1533 dead_osds = [int(x.id_)
1534 for x in filter(lambda x:
1535 not x.running(),
1536 self.ctx.daemons.
1537 iter_daemons_of_role('osd', self.cluster))]
1538 live_osds = [int(x.id_) for x in
1539 filter(lambda x:
1540 x.running(),
1541 self.ctx.daemons.iter_daemons_of_role('osd',
1542 self.cluster))]
1543 return {'in': in_osds, 'out': out_osds, 'up': up_osds,
1544 'down': down_osds, 'dead': dead_osds, 'live': live_osds,
1545 'raw': osd_lines}
1546
1547 def get_num_pgs(self):
1548 """
1549 Check cluster status for the number of pgs
1550 """
1551 status = self.raw_cluster_status()
1552 self.log(status)
1553 return status['pgmap']['num_pgs']
1554
1555 def create_erasure_code_profile(self, profile_name, profile):
1556 """
1557 Create an erasure code profile name that can be used as a parameter
1558 when creating an erasure coded pool.
1559 """
1560 with self.lock:
1561 args = cmd_erasure_code_profile(profile_name, profile)
1562 self.raw_cluster_cmd(*args)
1563
1564 def create_pool_with_unique_name(self, pg_num=16,
1565 erasure_code_profile_name=None,
1566 min_size=None,
1567 erasure_code_use_overwrites=False):
1568 """
1569 Create a pool named unique_pool_X where X is unique.
1570 """
1571 name = ""
1572 with self.lock:
1573 name = "unique_pool_%s" % (str(self.next_pool_id),)
1574 self.next_pool_id += 1
1575 self.create_pool(
1576 name,
1577 pg_num,
1578 erasure_code_profile_name=erasure_code_profile_name,
1579 min_size=min_size,
1580 erasure_code_use_overwrites=erasure_code_use_overwrites)
1581 return name
1582
1583 @contextlib.contextmanager
1584 def pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
1585 self.create_pool(pool_name, pg_num, erasure_code_profile_name)
1586 yield
1587 self.remove_pool(pool_name)
1588
1589 def create_pool(self, pool_name, pg_num=16,
1590 erasure_code_profile_name=None,
1591 min_size=None,
1592 erasure_code_use_overwrites=False):
1593 """
1594 Create a pool named from the pool_name parameter.
1595 :param pool_name: name of the pool being created.
1596 :param pg_num: initial number of pgs.
1597 :param erasure_code_profile_name: if set and !None create an
1598 erasure coded pool using the profile
1599 :param erasure_code_use_overwrites: if true, allow overwrites
1600 """
1601 with self.lock:
1602 assert isinstance(pool_name, basestring)
1603 assert isinstance(pg_num, int)
1604 assert pool_name not in self.pools
1605 self.log("creating pool_name %s" % (pool_name,))
1606 if erasure_code_profile_name:
1607 self.raw_cluster_cmd('osd', 'pool', 'create',
1608 pool_name, str(pg_num), str(pg_num),
1609 'erasure', erasure_code_profile_name)
1610 else:
1611 self.raw_cluster_cmd('osd', 'pool', 'create',
1612 pool_name, str(pg_num))
1613 if min_size is not None:
1614 self.raw_cluster_cmd(
1615 'osd', 'pool', 'set', pool_name,
1616 'min_size',
1617 str(min_size))
1618 if erasure_code_use_overwrites:
1619 self.raw_cluster_cmd(
1620 'osd', 'pool', 'set', pool_name,
1621 'allow_ec_overwrites',
1622 'true')
1623 self.pools[pool_name] = pg_num
1624 time.sleep(1)
1625
1626 def add_pool_snap(self, pool_name, snap_name):
1627 """
1628 Add pool snapshot
1629 :param pool_name: name of pool to snapshot
1630 :param snap_name: name of snapshot to take
1631 """
1632 self.raw_cluster_cmd('osd', 'pool', 'mksnap',
1633 str(pool_name), str(snap_name))
1634
1635 def remove_pool_snap(self, pool_name, snap_name):
1636 """
1637 Remove pool snapshot
1638 :param pool_name: name of pool to snapshot
1639 :param snap_name: name of snapshot to remove
1640 """
1641 self.raw_cluster_cmd('osd', 'pool', 'rmsnap',
1642 str(pool_name), str(snap_name))
1643
1644 def remove_pool(self, pool_name):
1645 """
1646 Remove the indicated pool
1647 :param pool_name: Pool to be removed
1648 """
1649 with self.lock:
1650 assert isinstance(pool_name, basestring)
1651 assert pool_name in self.pools
1652 self.log("removing pool_name %s" % (pool_name,))
1653 del self.pools[pool_name]
1654 self.do_rados(self.controller,
1655 ['rmpool', pool_name, pool_name,
1656 "--yes-i-really-really-mean-it"])
1657
1658 def get_pool(self):
1659 """
1660 Pick a random pool
1661 """
1662 with self.lock:
1663 return random.choice(self.pools.keys())
1664
1665 def get_pool_pg_num(self, pool_name):
1666 """
1667 Return the number of pgs in the pool specified.
1668 """
1669 with self.lock:
1670 assert isinstance(pool_name, basestring)
1671 if pool_name in self.pools:
1672 return self.pools[pool_name]
1673 return 0
1674
1675 def get_pool_property(self, pool_name, prop):
1676 """
1677 :param pool_name: pool
1678 :param prop: property to be checked.
1679 :returns: property as an int value.
1680 """
1681 with self.lock:
1682 assert isinstance(pool_name, basestring)
1683 assert isinstance(prop, basestring)
1684 output = self.raw_cluster_cmd(
1685 'osd',
1686 'pool',
1687 'get',
1688 pool_name,
1689 prop)
1690 return int(output.split()[1])
1691
1692 def set_pool_property(self, pool_name, prop, val):
1693 """
1694 :param pool_name: pool
1695 :param prop: property to be set.
1696 :param val: value to set.
1697
1698 This routine retries if set operation fails.
1699 """
1700 with self.lock:
1701 assert isinstance(pool_name, basestring)
1702 assert isinstance(prop, basestring)
1703 assert isinstance(val, int)
1704 tries = 0
1705 while True:
1706 r = self.raw_cluster_cmd_result(
1707 'osd',
1708 'pool',
1709 'set',
1710 pool_name,
1711 prop,
1712 str(val))
1713 if r != 11: # EAGAIN
1714 break
1715 tries += 1
1716 if tries > 50:
1717 raise Exception('timed out getting EAGAIN '
1718 'when setting pool property %s %s = %s' %
1719 (pool_name, prop, val))
1720 self.log('got EAGAIN setting pool property, '
1721 'waiting a few seconds...')
1722 time.sleep(2)
1723
1724 def expand_pool(self, pool_name, by, max_pgs):
1725 """
1726 Increase the number of pgs in a pool
1727 """
1728 with self.lock:
1729 assert isinstance(pool_name, basestring)
1730 assert isinstance(by, int)
1731 assert pool_name in self.pools
1732 if self.get_num_creating() > 0:
1733 return False
1734 if (self.pools[pool_name] + by) > max_pgs:
1735 return False
1736 self.log("increase pool size by %d" % (by,))
1737 new_pg_num = self.pools[pool_name] + by
1738 self.set_pool_property(pool_name, "pg_num", new_pg_num)
1739 self.pools[pool_name] = new_pg_num
1740 return True
1741
1742 def set_pool_pgpnum(self, pool_name, force):
1743 """
1744 Set pgpnum property of pool_name pool.
1745 """
1746 with self.lock:
1747 assert isinstance(pool_name, basestring)
1748 assert pool_name in self.pools
1749 if not force and self.get_num_creating() > 0:
1750 return False
1751 self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
1752 return True
1753
1754 def list_pg_missing(self, pgid):
1755 """
1756 return list of missing pgs with the id specified
1757 """
1758 r = None
1759 offset = {}
1760 while True:
1761 out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_missing',
1762 json.dumps(offset))
1763 j = json.loads(out)
1764 if r is None:
1765 r = j
1766 else:
1767 r['objects'].extend(j['objects'])
1768 if not 'more' in j:
1769 break
1770 if j['more'] == 0:
1771 break
1772 offset = j['objects'][-1]['oid']
1773 if 'more' in r:
1774 del r['more']
1775 return r
1776
1777 def get_pg_stats(self):
1778 """
1779 Dump the cluster and get pg stats
1780 """
1781 out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
1782 j = json.loads('\n'.join(out.split('\n')[1:]))
1783 return j['pg_stats']
1784
1785 def compile_pg_status(self):
1786 """
1787 Return a histogram of pg state values
1788 """
1789 ret = {}
1790 j = self.get_pg_stats()
1791 for pg in j:
1792 for status in pg['state'].split('+'):
1793 if status not in ret:
1794 ret[status] = 0
1795 ret[status] += 1
1796 return ret
1797
1798 @wait_for_pg_stats
1799 def with_pg_state(self, pool, pgnum, check):
1800 pgstr = self.get_pgid(pool, pgnum)
1801 stats = self.get_single_pg_stats(pgstr)
1802 assert(check(stats['state']))
1803
1804 @wait_for_pg_stats
1805 def with_pg(self, pool, pgnum, check):
1806 pgstr = self.get_pgid(pool, pgnum)
1807 stats = self.get_single_pg_stats(pgstr)
1808 return check(stats)
1809
1810 def get_last_scrub_stamp(self, pool, pgnum):
1811 """
1812 Get the timestamp of the last scrub.
1813 """
1814 stats = self.get_single_pg_stats(self.get_pgid(pool, pgnum))
1815 return stats["last_scrub_stamp"]
1816
1817 def do_pg_scrub(self, pool, pgnum, stype):
1818 """
1819 Scrub pg and wait for scrubbing to finish
1820 """
1821 init = self.get_last_scrub_stamp(pool, pgnum)
1822 RESEND_TIMEOUT = 120 # Must be a multiple of SLEEP_TIME
1823 FATAL_TIMEOUT = RESEND_TIMEOUT * 3
1824 SLEEP_TIME = 10
1825 timer = 0
1826 while init == self.get_last_scrub_stamp(pool, pgnum):
1827 assert timer < FATAL_TIMEOUT, "fatal timeout trying to " + stype
1828 self.log("waiting for scrub type %s" % (stype,))
1829 if (timer % RESEND_TIMEOUT) == 0:
1830 self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
1831 # The first time in this loop is the actual request
1832 if timer != 0 and stype == "repair":
1833 self.log("WARNING: Resubmitted a non-idempotent repair")
1834 time.sleep(SLEEP_TIME)
1835 timer += SLEEP_TIME
1836
1837 def wait_snap_trimming_complete(self, pool):
1838 """
1839 Wait for snap trimming on pool to end
1840 """
1841 POLL_PERIOD = 10
1842 FATAL_TIMEOUT = 600
1843 start = time.time()
1844 poolnum = self.get_pool_num(pool)
1845 poolnumstr = "%s." % (poolnum,)
1846 while (True):
1847 now = time.time()
1848 if (now - start) > FATAL_TIMEOUT:
1849 assert (now - start) < FATAL_TIMEOUT, \
1850 'failed to complete snap trimming before timeout'
1851 all_stats = self.get_pg_stats()
1852 trimming = False
1853 for pg in all_stats:
1854 if (poolnumstr in pg['pgid']) and ('snaptrim' in pg['state']):
1855 self.log("pg {pg} in trimming, state: {state}".format(
1856 pg=pg['pgid'],
1857 state=pg['state']))
1858 trimming = True
1859 if not trimming:
1860 break
1861 self.log("{pool} still trimming, waiting".format(pool=pool))
1862 time.sleep(POLL_PERIOD)
1863
1864 def get_single_pg_stats(self, pgid):
1865 """
1866 Return pg for the pgid specified.
1867 """
1868 all_stats = self.get_pg_stats()
1869
1870 for pg in all_stats:
1871 if pg['pgid'] == pgid:
1872 return pg
1873
1874 return None
1875
1876 def get_object_pg_with_shard(self, pool, name, osdid):
1877 """
1878 """
1879 pool_dump = self.get_pool_dump(pool)
1880 object_map = self.get_object_map(pool, name)
1881 if pool_dump["type"] == CephManager.ERASURE_CODED_POOL:
1882 shard = object_map['acting'].index(osdid)
1883 return "{pgid}s{shard}".format(pgid=object_map['pgid'],
1884 shard=shard)
1885 else:
1886 return object_map['pgid']
1887
1888 def get_object_primary(self, pool, name):
1889 """
1890 """
1891 object_map = self.get_object_map(pool, name)
1892 return object_map['acting_primary']
1893
1894 def get_object_map(self, pool, name):
1895 """
1896 osd map --format=json converted to a python object
1897 :returns: the python object
1898 """
1899 out = self.raw_cluster_cmd('--format=json', 'osd', 'map', pool, name)
1900 return json.loads('\n'.join(out.split('\n')[1:]))
1901
1902 def get_osd_dump_json(self):
1903 """
1904 osd dump --format=json converted to a python object
1905 :returns: the python object
1906 """
1907 out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
1908 return json.loads('\n'.join(out.split('\n')[1:]))
1909
1910 def get_osd_dump(self):
1911 """
1912 Dump osds
1913 :returns: all osds
1914 """
1915 return self.get_osd_dump_json()['osds']
1916
1917 def get_stuck_pgs(self, type_, threshold):
1918 """
1919 :returns: stuck pg information from the cluster
1920 """
1921 out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
1922 '--format=json')
1923 return json.loads(out)
1924
1925 def get_num_unfound_objects(self):
1926 """
1927 Check cluster status to get the number of unfound objects
1928 """
1929 status = self.raw_cluster_status()
1930 self.log(status)
1931 return status['pgmap'].get('unfound_objects', 0)
1932
1933 def get_num_creating(self):
1934 """
1935 Find the number of pgs in creating mode.
1936 """
1937 pgs = self.get_pg_stats()
1938 num = 0
1939 for pg in pgs:
1940 if 'creating' in pg['state']:
1941 num += 1
1942 return num
1943
1944 def get_num_active_clean(self):
1945 """
1946 Find the number of active and clean pgs.
1947 """
1948 pgs = self.get_pg_stats()
1949 num = 0
1950 for pg in pgs:
1951 if (pg['state'].count('active') and
1952 pg['state'].count('clean') and
1953 not pg['state'].count('stale')):
1954 num += 1
1955 return num
1956
1957 def get_num_active_recovered(self):
1958 """
1959 Find the number of active and recovered pgs.
1960 """
1961 pgs = self.get_pg_stats()
1962 num = 0
1963 for pg in pgs:
1964 if (pg['state'].count('active') and
1965 not pg['state'].count('recover') and
1966 not pg['state'].count('backfill') and
1967 not pg['state'].count('stale')):
1968 num += 1
1969 return num
1970
1971 def get_is_making_recovery_progress(self):
1972 """
1973 Return whether there is recovery progress discernable in the
1974 raw cluster status
1975 """
1976 status = self.raw_cluster_status()
1977 kps = status['pgmap'].get('recovering_keys_per_sec', 0)
1978 bps = status['pgmap'].get('recovering_bytes_per_sec', 0)
1979 ops = status['pgmap'].get('recovering_objects_per_sec', 0)
1980 return kps > 0 or bps > 0 or ops > 0
1981
1982 def get_num_active(self):
1983 """
1984 Find the number of active pgs.
1985 """
1986 pgs = self.get_pg_stats()
1987 num = 0
1988 for pg in pgs:
1989 if pg['state'].count('active') and not pg['state'].count('stale'):
1990 num += 1
1991 return num
1992
1993 def get_num_down(self):
1994 """
1995 Find the number of pgs that are down.
1996 """
1997 pgs = self.get_pg_stats()
1998 num = 0
1999 for pg in pgs:
2000 if ((pg['state'].count('down') and not
2001 pg['state'].count('stale')) or
2002 (pg['state'].count('incomplete') and not
2003 pg['state'].count('stale'))):
2004 num += 1
2005 return num
2006
2007 def get_num_active_down(self):
2008 """
2009 Find the number of pgs that are either active or down.
2010 """
2011 pgs = self.get_pg_stats()
2012 num = 0
2013 for pg in pgs:
2014 if ((pg['state'].count('active') and not
2015 pg['state'].count('stale')) or
2016 (pg['state'].count('down') and not
2017 pg['state'].count('stale')) or
2018 (pg['state'].count('incomplete') and not
2019 pg['state'].count('stale'))):
2020 num += 1
2021 return num
2022
2023 def is_clean(self):
2024 """
2025 True if all pgs are clean
2026 """
2027 return self.get_num_active_clean() == self.get_num_pgs()
2028
2029 def is_recovered(self):
2030 """
2031 True if all pgs have recovered
2032 """
2033 return self.get_num_active_recovered() == self.get_num_pgs()
2034
2035 def is_active_or_down(self):
2036 """
2037 True if all pgs are active or down
2038 """
2039 return self.get_num_active_down() == self.get_num_pgs()
2040
2041 def wait_for_clean(self, timeout=None):
2042 """
2043 Returns true when all pgs are clean.
2044 """
2045 self.log("waiting for clean")
2046 start = time.time()
2047 num_active_clean = self.get_num_active_clean()
2048 while not self.is_clean():
2049 if timeout is not None:
2050 if self.get_is_making_recovery_progress():
2051 self.log("making progress, resetting timeout")
2052 start = time.time()
2053 else:
2054 self.log("no progress seen, keeping timeout for now")
2055 if time.time() - start >= timeout:
2056 self.log('dumping pgs')
2057 out = self.raw_cluster_cmd('pg', 'dump')
2058 self.log(out)
2059 assert time.time() - start < timeout, \
2060 'failed to become clean before timeout expired'
2061 cur_active_clean = self.get_num_active_clean()
2062 if cur_active_clean != num_active_clean:
2063 start = time.time()
2064 num_active_clean = cur_active_clean
2065 time.sleep(3)
2066 self.log("clean!")
2067
2068 def are_all_osds_up(self):
2069 """
2070 Returns true if all osds are up.
2071 """
2072 x = self.get_osd_dump()
2073 return (len(x) == sum([(y['up'] > 0) for y in x]))
2074
2075 def wait_for_all_up(self, timeout=None):
2076 """
2077 When this exits, either the timeout has expired, or all
2078 osds are up.
2079 """
2080 self.log("waiting for all up")
2081 start = time.time()
2082 while not self.are_all_osds_up():
2083 if timeout is not None:
2084 assert time.time() - start < timeout, \
2085 'timeout expired in wait_for_all_up'
2086 time.sleep(3)
2087 self.log("all up!")
2088
2089 def wait_for_recovery(self, timeout=None):
2090 """
2091 Check peering. When this exists, we have recovered.
2092 """
2093 self.log("waiting for recovery to complete")
2094 start = time.time()
2095 num_active_recovered = self.get_num_active_recovered()
2096 while not self.is_recovered():
2097 now = time.time()
2098 if timeout is not None:
2099 if self.get_is_making_recovery_progress():
2100 self.log("making progress, resetting timeout")
2101 start = time.time()
2102 else:
2103 self.log("no progress seen, keeping timeout for now")
2104 if now - start >= timeout:
2105 self.log('dumping pgs')
2106 out = self.raw_cluster_cmd('pg', 'dump')
2107 self.log(out)
2108 assert now - start < timeout, \
2109 'failed to recover before timeout expired'
2110 cur_active_recovered = self.get_num_active_recovered()
2111 if cur_active_recovered != num_active_recovered:
2112 start = time.time()
2113 num_active_recovered = cur_active_recovered
2114 time.sleep(3)
2115 self.log("recovered!")
2116
2117 def wait_for_active(self, timeout=None):
2118 """
2119 Check peering. When this exists, we are definitely active
2120 """
2121 self.log("waiting for peering to complete")
2122 start = time.time()
2123 num_active = self.get_num_active()
2124 while not self.is_active():
2125 if timeout is not None:
2126 if time.time() - start >= timeout:
2127 self.log('dumping pgs')
2128 out = self.raw_cluster_cmd('pg', 'dump')
2129 self.log(out)
2130 assert time.time() - start < timeout, \
2131 'failed to recover before timeout expired'
2132 cur_active = self.get_num_active()
2133 if cur_active != num_active:
2134 start = time.time()
2135 num_active = cur_active
2136 time.sleep(3)
2137 self.log("active!")
2138
2139 def wait_for_active_or_down(self, timeout=None):
2140 """
2141 Check peering. When this exists, we are definitely either
2142 active or down
2143 """
2144 self.log("waiting for peering to complete or become blocked")
2145 start = time.time()
2146 num_active_down = self.get_num_active_down()
2147 while not self.is_active_or_down():
2148 if timeout is not None:
2149 if time.time() - start >= timeout:
2150 self.log('dumping pgs')
2151 out = self.raw_cluster_cmd('pg', 'dump')
2152 self.log(out)
2153 assert time.time() - start < timeout, \
2154 'failed to recover before timeout expired'
2155 cur_active_down = self.get_num_active_down()
2156 if cur_active_down != num_active_down:
2157 start = time.time()
2158 num_active_down = cur_active_down
2159 time.sleep(3)
2160 self.log("active or down!")
2161
2162 def osd_is_up(self, osd):
2163 """
2164 Wrapper for osd check
2165 """
2166 osds = self.get_osd_dump()
2167 return osds[osd]['up'] > 0
2168
2169 def wait_till_osd_is_up(self, osd, timeout=None):
2170 """
2171 Loop waiting for osd.
2172 """
2173 self.log('waiting for osd.%d to be up' % osd)
2174 start = time.time()
2175 while not self.osd_is_up(osd):
2176 if timeout is not None:
2177 assert time.time() - start < timeout, \
2178 'osd.%d failed to come up before timeout expired' % osd
2179 time.sleep(3)
2180 self.log('osd.%d is up' % osd)
2181
2182 def is_active(self):
2183 """
2184 Wrapper to check if all pgs are active
2185 """
2186 return self.get_num_active() == self.get_num_pgs()
2187
2188 def wait_till_active(self, timeout=None):
2189 """
2190 Wait until all pgs are active.
2191 """
2192 self.log("waiting till active")
2193 start = time.time()
2194 while not self.is_active():
2195 if timeout is not None:
2196 if time.time() - start >= timeout:
2197 self.log('dumping pgs')
2198 out = self.raw_cluster_cmd('pg', 'dump')
2199 self.log(out)
2200 assert time.time() - start < timeout, \
2201 'failed to become active before timeout expired'
2202 time.sleep(3)
2203 self.log("active!")
2204
2205 def mark_out_osd(self, osd):
2206 """
2207 Wrapper to mark osd out.
2208 """
2209 self.raw_cluster_cmd('osd', 'out', str(osd))
2210
2211 def kill_osd(self, osd):
2212 """
2213 Kill osds by either power cycling (if indicated by the config)
2214 or by stopping.
2215 """
2216 if self.config.get('powercycle'):
2217 remote = self.find_remote('osd', osd)
2218 self.log('kill_osd on osd.{o} '
2219 'doing powercycle of {s}'.format(o=osd, s=remote.name))
2220 self._assert_ipmi(remote)
2221 remote.console.power_off()
2222 elif self.config.get('bdev_inject_crash') and self.config.get('bdev_inject_crash_probability'):
2223 if random.uniform(0, 1) < self.config.get('bdev_inject_crash_probability', .5):
2224 self.raw_cluster_cmd(
2225 '--', 'tell', 'osd.%d' % osd,
2226 'injectargs',
2227 '--bdev-inject-crash %d' % self.config.get('bdev_inject_crash'),
2228 )
2229 try:
2230 self.ctx.daemons.get_daemon('osd', osd, self.cluster).wait()
2231 except:
2232 pass
2233 else:
2234 raise RuntimeError('osd.%s did not fail' % osd)
2235 else:
2236 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2237 else:
2238 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2239
2240 @staticmethod
2241 def _assert_ipmi(remote):
2242 assert remote.console.has_ipmi_credentials, (
2243 "powercycling requested but RemoteConsole is not "
2244 "initialized. Check ipmi config.")
2245
2246 def blackhole_kill_osd(self, osd):
2247 """
2248 Stop osd if nothing else works.
2249 """
2250 self.raw_cluster_cmd('--', 'tell', 'osd.%d' % osd,
2251 'injectargs',
2252 '--objectstore-blackhole')
2253 time.sleep(2)
2254 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2255
2256 def revive_osd(self, osd, timeout=150, skip_admin_check=False):
2257 """
2258 Revive osds by either power cycling (if indicated by the config)
2259 or by restarting.
2260 """
2261 if self.config.get('powercycle'):
2262 remote = self.find_remote('osd', osd)
2263 self.log('kill_osd on osd.{o} doing powercycle of {s}'.
2264 format(o=osd, s=remote.name))
2265 self._assert_ipmi(remote)
2266 remote.console.power_on()
2267 if not remote.console.check_status(300):
2268 raise Exception('Failed to revive osd.{o} via ipmi'.
2269 format(o=osd))
2270 teuthology.reconnect(self.ctx, 60, [remote])
2271 mount_osd_data(self.ctx, remote, self.cluster, str(osd))
2272 self.make_admin_daemon_dir(remote)
2273 self.ctx.daemons.get_daemon('osd', osd, self.cluster).reset()
2274 self.ctx.daemons.get_daemon('osd', osd, self.cluster).restart()
2275
2276 if not skip_admin_check:
2277 # wait for dump_ops_in_flight; this command doesn't appear
2278 # until after the signal handler is installed and it is safe
2279 # to stop the osd again without making valgrind leak checks
2280 # unhappy. see #5924.
2281 self.wait_run_admin_socket('osd', osd,
2282 args=['dump_ops_in_flight'],
2283 timeout=timeout, stdout=DEVNULL)
2284
2285 def mark_down_osd(self, osd):
2286 """
2287 Cluster command wrapper
2288 """
2289 self.raw_cluster_cmd('osd', 'down', str(osd))
2290
2291 def mark_in_osd(self, osd):
2292 """
2293 Cluster command wrapper
2294 """
2295 self.raw_cluster_cmd('osd', 'in', str(osd))
2296
2297 def signal_osd(self, osd, sig, silent=False):
2298 """
2299 Wrapper to local get_daemon call which sends the given
2300 signal to the given osd.
2301 """
2302 self.ctx.daemons.get_daemon('osd', osd,
2303 self.cluster).signal(sig, silent=silent)
2304
2305 ## monitors
2306 def signal_mon(self, mon, sig, silent=False):
2307 """
2308 Wrapper to local get_deamon call
2309 """
2310 self.ctx.daemons.get_daemon('mon', mon,
2311 self.cluster).signal(sig, silent=silent)
2312
2313 def kill_mon(self, mon):
2314 """
2315 Kill the monitor by either power cycling (if the config says so),
2316 or by doing a stop.
2317 """
2318 if self.config.get('powercycle'):
2319 remote = self.find_remote('mon', mon)
2320 self.log('kill_mon on mon.{m} doing powercycle of {s}'.
2321 format(m=mon, s=remote.name))
2322 self._assert_ipmi(remote)
2323 remote.console.power_off()
2324 else:
2325 self.ctx.daemons.get_daemon('mon', mon, self.cluster).stop()
2326
2327 def revive_mon(self, mon):
2328 """
2329 Restart by either power cycling (if the config says so),
2330 or by doing a normal restart.
2331 """
2332 if self.config.get('powercycle'):
2333 remote = self.find_remote('mon', mon)
2334 self.log('revive_mon on mon.{m} doing powercycle of {s}'.
2335 format(m=mon, s=remote.name))
2336 self._assert_ipmi(remote)
2337 remote.console.power_on()
2338 self.make_admin_daemon_dir(remote)
2339 self.ctx.daemons.get_daemon('mon', mon, self.cluster).restart()
2340
31f18b77
FG
2341 def revive_mgr(self, mgr):
2342 """
2343 Restart by either power cycling (if the config says so),
2344 or by doing a normal restart.
2345 """
2346 if self.config.get('powercycle'):
2347 remote = self.find_remote('mgr', mgr)
2348 self.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
2349 format(m=mgr, s=remote.name))
2350 self._assert_ipmi(remote)
2351 remote.console.power_on()
2352 self.make_admin_daemon_dir(remote)
2353 self.ctx.daemons.get_daemon('mgr', mgr, self.cluster).restart()
2354
7c673cae
FG
2355 def get_mon_status(self, mon):
2356 """
2357 Extract all the monitor status information from the cluster
2358 """
2359 addr = self.ctx.ceph[self.cluster].conf['mon.%s' % mon]['mon addr']
2360 out = self.raw_cluster_cmd('-m', addr, 'mon_status')
2361 return json.loads(out)
2362
2363 def get_mon_quorum(self):
2364 """
2365 Extract monitor quorum information from the cluster
2366 """
2367 out = self.raw_cluster_cmd('quorum_status')
2368 j = json.loads(out)
2369 self.log('quorum_status is %s' % out)
2370 return j['quorum']
2371
2372 def wait_for_mon_quorum_size(self, size, timeout=300):
2373 """
2374 Loop until quorum size is reached.
2375 """
2376 self.log('waiting for quorum size %d' % size)
2377 start = time.time()
2378 while not len(self.get_mon_quorum()) == size:
2379 if timeout is not None:
2380 assert time.time() - start < timeout, \
2381 ('failed to reach quorum size %d '
2382 'before timeout expired' % size)
2383 time.sleep(3)
2384 self.log("quorum is size %d" % size)
2385
2386 def get_mon_health(self, debug=False):
2387 """
2388 Extract all the monitor health information.
2389 """
2390 out = self.raw_cluster_cmd('health', '--format=json')
2391 if debug:
2392 self.log('health:\n{h}'.format(h=out))
2393 return json.loads(out)
2394
2395 def get_mds_status(self, mds):
2396 """
2397 Run cluster commands for the mds in order to get mds information
2398 """
2399 out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
2400 j = json.loads(' '.join(out.splitlines()[1:]))
2401 # collate; for dup ids, larger gid wins.
2402 for info in j['info'].itervalues():
2403 if info['name'] == mds:
2404 return info
2405 return None
2406
2407 def get_filepath(self):
2408 """
2409 Return path to osd data with {id} needing to be replaced
2410 """
2411 return '/var/lib/ceph/osd/' + self.cluster + '-{id}'
2412
2413 def make_admin_daemon_dir(self, remote):
2414 """
2415 Create /var/run/ceph directory on remote site.
2416
2417 :param ctx: Context
2418 :param remote: Remote site
2419 """
2420 remote.run(args=['sudo',
2421 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
2422
2423
2424def utility_task(name):
2425 """
2426 Generate ceph_manager subtask corresponding to ceph_manager
2427 method name
2428 """
2429 def task(ctx, config):
2430 if config is None:
2431 config = {}
2432 args = config.get('args', [])
2433 kwargs = config.get('kwargs', {})
2434 cluster = config.get('cluster', 'ceph')
2435 fn = getattr(ctx.managers[cluster], name)
2436 fn(*args, **kwargs)
2437 return task
2438
2439revive_osd = utility_task("revive_osd")
2440revive_mon = utility_task("revive_mon")
2441kill_osd = utility_task("kill_osd")
2442kill_mon = utility_task("kill_mon")
2443create_pool = utility_task("create_pool")
2444remove_pool = utility_task("remove_pool")
2445wait_for_clean = utility_task("wait_for_clean")
2446set_pool_property = utility_task("set_pool_property")
2447do_pg_scrub = utility_task("do_pg_scrub")