]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/ceph_manager.py
465da73d2358c3df21d8916b3cfe6b312426ec72
[ceph.git] / ceph / qa / tasks / ceph_manager.py
1 """
2 ceph manager -- Thrasher and CephManager objects
3 """
4 from cStringIO import StringIO
5 from functools import wraps
6 import contextlib
7 import random
8 import signal
9 import time
10 import gevent
11 import base64
12 import json
13 import logging
14 import threading
15 import traceback
16 import os
17 from teuthology import misc as teuthology
18 from tasks.scrub import Scrubber
19 from util.rados import cmd_erasure_code_profile
20 from util import get_remote
21 from teuthology.contextutil import safe_while
22 from teuthology.orchestra.remote import Remote
23 from teuthology.orchestra import run
24 from teuthology.exceptions import CommandFailedError
25
26 try:
27 from subprocess import DEVNULL # py3k
28 except ImportError:
29 DEVNULL = open(os.devnull, 'r+')
30
31 DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
32
33 log = logging.getLogger(__name__)
34
35
36 def write_conf(ctx, conf_path=DEFAULT_CONF_PATH, cluster='ceph'):
37 conf_fp = StringIO()
38 ctx.ceph[cluster].conf.write(conf_fp)
39 conf_fp.seek(0)
40 writes = ctx.cluster.run(
41 args=[
42 'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
43 'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
44 'sudo', 'python',
45 '-c',
46 ('import shutil, sys; '
47 'shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))'),
48 conf_path,
49 run.Raw('&&'),
50 'sudo', 'chmod', '0644', conf_path,
51 ],
52 stdin=run.PIPE,
53 wait=False)
54 teuthology.feed_many_stdins_and_close(conf_fp, writes)
55 run.wait(writes)
56
57
58 def mount_osd_data(ctx, remote, cluster, osd):
59 """
60 Mount a remote OSD
61
62 :param ctx: Context
63 :param remote: Remote site
64 :param cluster: name of ceph cluster
65 :param osd: Osd name
66 """
67 log.debug('Mounting data for osd.{o} on {r}'.format(o=osd, r=remote))
68 role = "{0}.osd.{1}".format(cluster, osd)
69 alt_role = role if cluster != 'ceph' else "osd.{0}".format(osd)
70 if remote in ctx.disk_config.remote_to_roles_to_dev:
71 if alt_role in ctx.disk_config.remote_to_roles_to_dev[remote]:
72 role = alt_role
73 if role not in ctx.disk_config.remote_to_roles_to_dev[remote]:
74 return
75 dev = ctx.disk_config.remote_to_roles_to_dev[remote][role]
76 mount_options = ctx.disk_config.\
77 remote_to_roles_to_dev_mount_options[remote][role]
78 fstype = ctx.disk_config.remote_to_roles_to_dev_fstype[remote][role]
79 mnt = os.path.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster, osd))
80
81 log.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
82 'mountpoint: {p}, type: {t}, options: {v}'.format(
83 o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options,
84 c=cluster))
85
86 remote.run(
87 args=[
88 'sudo',
89 'mount',
90 '-t', fstype,
91 '-o', ','.join(mount_options),
92 dev,
93 mnt,
94 ]
95 )
96
97
98 class Thrasher:
99 """
100 Object used to thrash Ceph
101 """
102 def __init__(self, manager, config, logger=None):
103 self.ceph_manager = manager
104 self.cluster = manager.cluster
105 self.ceph_manager.wait_for_clean()
106 osd_status = self.ceph_manager.get_osd_status()
107 self.in_osds = osd_status['in']
108 self.live_osds = osd_status['live']
109 self.out_osds = osd_status['out']
110 self.dead_osds = osd_status['dead']
111 self.stopping = False
112 self.logger = logger
113 self.config = config
114 self.revive_timeout = self.config.get("revive_timeout", 150)
115 self.pools_to_fix_pgp_num = set()
116 if self.config.get('powercycle'):
117 self.revive_timeout += 120
118 self.clean_wait = self.config.get('clean_wait', 0)
119 self.minin = self.config.get("min_in", 3)
120 self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
121 self.sighup_delay = self.config.get('sighup_delay')
122 self.optrack_toggle_delay = self.config.get('optrack_toggle_delay')
123 self.dump_ops_enable = self.config.get('dump_ops_enable')
124 self.noscrub_toggle_delay = self.config.get('noscrub_toggle_delay')
125 self.chance_thrash_cluster_full = self.config.get('chance_thrash_cluster_full', .05)
126 self.chance_thrash_pg_upmap = self.config.get('chance_thrash_pg_upmap', 1.0)
127 self.chance_thrash_pg_upmap_items = self.config.get('chance_thrash_pg_upmap', 1.0)
128
129 num_osds = self.in_osds + self.out_osds
130 self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * num_osds
131 if self.logger is not None:
132 self.log = lambda x: self.logger.info(x)
133 else:
134 def tmp(x):
135 """
136 Implement log behavior
137 """
138 print x
139 self.log = tmp
140 if self.config is None:
141 self.config = dict()
142 # prevent monitor from auto-marking things out while thrasher runs
143 # try both old and new tell syntax, in case we are testing old code
144 self.saved_options = []
145 # assuming that the default settings do not vary from one daemon to
146 # another
147 first_mon = teuthology.get_first_mon(manager.ctx, self.config).split('.')
148 opts = [('mon', 'mon_osd_down_out_interval', 0)]
149 for service, opt, new_value in opts:
150 old_value = manager.get_config(first_mon[0],
151 first_mon[1],
152 opt)
153 self.saved_options.append((service, opt, old_value))
154 self._set_config(service, '*', opt, new_value)
155 # initialize ceph_objectstore_tool property - must be done before
156 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
157 if (self.config.get('powercycle') or
158 not self.cmd_exists_on_osds("ceph-objectstore-tool") or
159 self.config.get('disable_objectstore_tool_tests', False)):
160 self.ceph_objectstore_tool = False
161 self.test_rm_past_intervals = False
162 if self.config.get('powercycle'):
163 self.log("Unable to test ceph-objectstore-tool, "
164 "powercycle testing")
165 else:
166 self.log("Unable to test ceph-objectstore-tool, "
167 "not available on all OSD nodes")
168 else:
169 self.ceph_objectstore_tool = \
170 self.config.get('ceph_objectstore_tool', True)
171 self.test_rm_past_intervals = \
172 self.config.get('test_rm_past_intervals', True)
173 # spawn do_thrash
174 self.thread = gevent.spawn(self.do_thrash)
175 if self.sighup_delay:
176 self.sighup_thread = gevent.spawn(self.do_sighup)
177 if self.optrack_toggle_delay:
178 self.optrack_toggle_thread = gevent.spawn(self.do_optrack_toggle)
179 if self.dump_ops_enable == "true":
180 self.dump_ops_thread = gevent.spawn(self.do_dump_ops)
181 if self.noscrub_toggle_delay:
182 self.noscrub_toggle_thread = gevent.spawn(self.do_noscrub_toggle)
183
184 def _set_config(self, service_type, service_id, name, value):
185 opt_arg = '--{name} {value}'.format(name=name, value=value)
186 whom = '.'.join([service_type, service_id])
187 self.ceph_manager.raw_cluster_cmd('--', 'tell', whom,
188 'injectargs', opt_arg)
189
190
191 def cmd_exists_on_osds(self, cmd):
192 allremotes = self.ceph_manager.ctx.cluster.only(\
193 teuthology.is_type('osd', self.cluster)).remotes.keys()
194 allremotes = list(set(allremotes))
195 for remote in allremotes:
196 proc = remote.run(args=['type', cmd], wait=True,
197 check_status=False, stdout=StringIO(),
198 stderr=StringIO())
199 if proc.exitstatus != 0:
200 return False;
201 return True;
202
203 def kill_osd(self, osd=None, mark_down=False, mark_out=False):
204 """
205 :param osd: Osd to be killed.
206 :mark_down: Mark down if true.
207 :mark_out: Mark out if true.
208 """
209 if osd is None:
210 osd = random.choice(self.live_osds)
211 self.log("Killing osd %s, live_osds are %s" % (str(osd),
212 str(self.live_osds)))
213 self.live_osds.remove(osd)
214 self.dead_osds.append(osd)
215 self.ceph_manager.kill_osd(osd)
216 if mark_down:
217 self.ceph_manager.mark_down_osd(osd)
218 if mark_out and osd in self.in_osds:
219 self.out_osd(osd)
220 if self.ceph_objectstore_tool:
221 self.log("Testing ceph-objectstore-tool on down osd")
222 remote = self.ceph_manager.find_remote('osd', osd)
223 FSPATH = self.ceph_manager.get_filepath()
224 JPATH = os.path.join(FSPATH, "journal")
225 exp_osd = imp_osd = osd
226 exp_remote = imp_remote = remote
227 # If an older osd is available we'll move a pg from there
228 if (len(self.dead_osds) > 1 and
229 random.random() < self.chance_move_pg):
230 exp_osd = random.choice(self.dead_osds[:-1])
231 exp_remote = self.ceph_manager.find_remote('osd', exp_osd)
232 if ('keyvaluestore_backend' in
233 self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
234 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
235 "--data-path {fpath} --journal-path {jpath} "
236 "--type keyvaluestore "
237 "--log-file="
238 "/var/log/ceph/objectstore_tool.\\$pid.log ".
239 format(fpath=FSPATH, jpath=JPATH))
240 else:
241 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
242 "--data-path {fpath} --journal-path {jpath} "
243 "--log-file="
244 "/var/log/ceph/objectstore_tool.\\$pid.log ".
245 format(fpath=FSPATH, jpath=JPATH))
246 cmd = (prefix + "--op list-pgs").format(id=exp_osd)
247
248 # ceph-objectstore-tool might be temporarily absent during an
249 # upgrade - see http://tracker.ceph.com/issues/18014
250 with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
251 while proceed():
252 proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'],
253 wait=True, check_status=False, stdout=StringIO(),
254 stderr=StringIO())
255 if proc.exitstatus == 0:
256 break
257 log.debug("ceph-objectstore-tool binary not present, trying again")
258
259 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
260 # see http://tracker.ceph.com/issues/19556
261 with safe_while(sleep=15, tries=40, action="ceph-objectstore-tool --op list-pgs") as proceed:
262 while proceed():
263 proc = exp_remote.run(args=cmd, wait=True,
264 check_status=False,
265 stdout=StringIO(), stderr=StringIO())
266 if proc.exitstatus == 0:
267 break
268 elif proc.exitstatus == 1 and proc.stderr == "OSD has the store locked":
269 continue
270 else:
271 raise Exception("ceph-objectstore-tool: "
272 "exp list-pgs failure with status {ret}".
273 format(ret=proc.exitstatus))
274
275 pgs = proc.stdout.getvalue().split('\n')[:-1]
276 if len(pgs) == 0:
277 self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
278 return
279 pg = random.choice(pgs)
280 exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
281 exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
282 exp_path = os.path.join(exp_path,
283 "exp.{pg}.{id}".format(
284 pg=pg,
285 id=exp_osd))
286 # export
287 cmd = prefix + "--op export --pgid {pg} --file {file}"
288 cmd = cmd.format(id=exp_osd, pg=pg, file=exp_path)
289 proc = exp_remote.run(args=cmd)
290 if proc.exitstatus:
291 raise Exception("ceph-objectstore-tool: "
292 "export failure with status {ret}".
293 format(ret=proc.exitstatus))
294 # remove
295 cmd = prefix + "--op remove --pgid {pg}"
296 cmd = cmd.format(id=exp_osd, pg=pg)
297 proc = exp_remote.run(args=cmd)
298 if proc.exitstatus:
299 raise Exception("ceph-objectstore-tool: "
300 "remove failure with status {ret}".
301 format(ret=proc.exitstatus))
302 # If there are at least 2 dead osds we might move the pg
303 if exp_osd != imp_osd:
304 # If pg isn't already on this osd, then we will move it there
305 cmd = (prefix + "--op list-pgs").format(id=imp_osd)
306 proc = imp_remote.run(args=cmd, wait=True,
307 check_status=False, stdout=StringIO())
308 if proc.exitstatus:
309 raise Exception("ceph-objectstore-tool: "
310 "imp list-pgs failure with status {ret}".
311 format(ret=proc.exitstatus))
312 pgs = proc.stdout.getvalue().split('\n')[:-1]
313 if pg not in pgs:
314 self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
315 format(pg=pg, fosd=exp_osd, tosd=imp_osd))
316 if imp_remote != exp_remote:
317 # Copy export file to the other machine
318 self.log("Transfer export file from {srem} to {trem}".
319 format(srem=exp_remote, trem=imp_remote))
320 tmpexport = Remote.get_file(exp_remote, exp_path)
321 Remote.put_file(imp_remote, tmpexport, exp_path)
322 os.remove(tmpexport)
323 else:
324 # Can't move the pg after all
325 imp_osd = exp_osd
326 imp_remote = exp_remote
327 # import
328 cmd = (prefix + "--op import --file {file}")
329 cmd = cmd.format(id=imp_osd, file=exp_path)
330 proc = imp_remote.run(args=cmd, wait=True, check_status=False,
331 stderr=StringIO())
332 if proc.exitstatus == 1:
333 bogosity = "The OSD you are using is older than the exported PG"
334 if bogosity in proc.stderr.getvalue():
335 self.log("OSD older than exported PG"
336 "...ignored")
337 elif proc.exitstatus == 10:
338 self.log("Pool went away before processing an import"
339 "...ignored")
340 elif proc.exitstatus == 11:
341 self.log("Attempt to import an incompatible export"
342 "...ignored")
343 elif proc.exitstatus:
344 raise Exception("ceph-objectstore-tool: "
345 "import failure with status {ret}".
346 format(ret=proc.exitstatus))
347 cmd = "rm -f {file}".format(file=exp_path)
348 exp_remote.run(args=cmd)
349 if imp_remote != exp_remote:
350 imp_remote.run(args=cmd)
351
352 # apply low split settings to each pool
353 for pool in self.ceph_manager.list_pools():
354 no_sudo_prefix = prefix[5:]
355 cmd = ("CEPH_ARGS='--filestore-merge-threshold 1 "
356 "--filestore-split-multiple 1' sudo -E "
357 + no_sudo_prefix + "--op apply-layout-settings --pool " + pool).format(id=osd)
358 proc = remote.run(args=cmd, wait=True, check_status=False, stderr=StringIO())
359 output = proc.stderr.getvalue()
360 if 'Couldn\'t find pool' in output:
361 continue
362 if proc.exitstatus:
363 raise Exception("ceph-objectstore-tool apply-layout-settings"
364 " failed with {status}".format(status=proc.exitstatus))
365
366 def rm_past_intervals(self, osd=None):
367 """
368 :param osd: Osd to find pg to remove past intervals
369 """
370 if self.test_rm_past_intervals:
371 if osd is None:
372 osd = random.choice(self.dead_osds)
373 self.log("Use ceph_objectstore_tool to remove past intervals")
374 remote = self.ceph_manager.find_remote('osd', osd)
375 FSPATH = self.ceph_manager.get_filepath()
376 JPATH = os.path.join(FSPATH, "journal")
377 if ('keyvaluestore_backend' in
378 self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
379 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
380 "--data-path {fpath} --journal-path {jpath} "
381 "--type keyvaluestore "
382 "--log-file="
383 "/var/log/ceph/objectstore_tool.\\$pid.log ".
384 format(fpath=FSPATH, jpath=JPATH))
385 else:
386 prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
387 "--data-path {fpath} --journal-path {jpath} "
388 "--log-file="
389 "/var/log/ceph/objectstore_tool.\\$pid.log ".
390 format(fpath=FSPATH, jpath=JPATH))
391 cmd = (prefix + "--op list-pgs").format(id=osd)
392 proc = remote.run(args=cmd, wait=True,
393 check_status=False, stdout=StringIO())
394 if proc.exitstatus:
395 raise Exception("ceph_objectstore_tool: "
396 "exp list-pgs failure with status {ret}".
397 format(ret=proc.exitstatus))
398 pgs = proc.stdout.getvalue().split('\n')[:-1]
399 if len(pgs) == 0:
400 self.log("No PGs found for osd.{osd}".format(osd=osd))
401 return
402 pg = random.choice(pgs)
403 cmd = (prefix + "--op rm-past-intervals --pgid {pg}").\
404 format(id=osd, pg=pg)
405 proc = remote.run(args=cmd)
406 if proc.exitstatus:
407 raise Exception("ceph_objectstore_tool: "
408 "rm-past-intervals failure with status {ret}".
409 format(ret=proc.exitstatus))
410
411 def blackhole_kill_osd(self, osd=None):
412 """
413 If all else fails, kill the osd.
414 :param osd: Osd to be killed.
415 """
416 if osd is None:
417 osd = random.choice(self.live_osds)
418 self.log("Blackholing and then killing osd %s, live_osds are %s" %
419 (str(osd), str(self.live_osds)))
420 self.live_osds.remove(osd)
421 self.dead_osds.append(osd)
422 self.ceph_manager.blackhole_kill_osd(osd)
423
424 def revive_osd(self, osd=None, skip_admin_check=False):
425 """
426 Revive the osd.
427 :param osd: Osd to be revived.
428 """
429 if osd is None:
430 osd = random.choice(self.dead_osds)
431 self.log("Reviving osd %s" % (str(osd),))
432 self.ceph_manager.revive_osd(
433 osd,
434 self.revive_timeout,
435 skip_admin_check=skip_admin_check)
436 self.dead_osds.remove(osd)
437 self.live_osds.append(osd)
438
439 def out_osd(self, osd=None):
440 """
441 Mark the osd out
442 :param osd: Osd to be marked.
443 """
444 if osd is None:
445 osd = random.choice(self.in_osds)
446 self.log("Removing osd %s, in_osds are: %s" %
447 (str(osd), str(self.in_osds)))
448 self.ceph_manager.mark_out_osd(osd)
449 self.in_osds.remove(osd)
450 self.out_osds.append(osd)
451
452 def in_osd(self, osd=None):
453 """
454 Mark the osd out
455 :param osd: Osd to be marked.
456 """
457 if osd is None:
458 osd = random.choice(self.out_osds)
459 if osd in self.dead_osds:
460 return self.revive_osd(osd)
461 self.log("Adding osd %s" % (str(osd),))
462 self.out_osds.remove(osd)
463 self.in_osds.append(osd)
464 self.ceph_manager.mark_in_osd(osd)
465 self.log("Added osd %s" % (str(osd),))
466
467 def reweight_osd_or_by_util(self, osd=None):
468 """
469 Reweight an osd that is in
470 :param osd: Osd to be marked.
471 """
472 if osd is not None or random.choice([True, False]):
473 if osd is None:
474 osd = random.choice(self.in_osds)
475 val = random.uniform(.1, 1.0)
476 self.log("Reweighting osd %s to %s" % (str(osd), str(val)))
477 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
478 str(osd), str(val))
479 else:
480 # do it several times, the option space is large
481 for i in range(5):
482 options = {
483 'max_change': random.choice(['0.05', '1.0', '3.0']),
484 'overage': random.choice(['110', '1000']),
485 'type': random.choice([
486 'reweight-by-utilization',
487 'test-reweight-by-utilization']),
488 }
489 self.log("Reweighting by: %s"%(str(options),))
490 self.ceph_manager.raw_cluster_cmd(
491 'osd',
492 options['type'],
493 options['overage'],
494 options['max_change'])
495
496 def primary_affinity(self, osd=None):
497 if osd is None:
498 osd = random.choice(self.in_osds)
499 if random.random() >= .5:
500 pa = random.random()
501 elif random.random() >= .5:
502 pa = 1
503 else:
504 pa = 0
505 self.log('Setting osd %s primary_affinity to %f' % (str(osd), pa))
506 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
507 str(osd), str(pa))
508
509 def thrash_cluster_full(self):
510 """
511 Set and unset cluster full condition
512 """
513 self.log('Setting full ratio to .001')
514 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
515 time.sleep(1)
516 self.log('Setting full ratio back to .95')
517 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
518
519 def thrash_pg_upmap(self):
520 """
521 Install or remove random pg_upmap entries in OSDMap
522 """
523 from random import shuffle
524 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
525 j = json.loads(out)
526 self.log('j is %s' % j)
527 try:
528 if random.random() >= .3:
529 pgs = self.ceph_manager.get_pg_stats()
530 pg = random.choice(pgs)
531 pgid = str(pg['pgid'])
532 poolid = int(pgid.split('.')[0])
533 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
534 if len(sizes) == 0:
535 return
536 n = sizes[0]
537 osds = self.in_osds + self.out_osds
538 shuffle(osds)
539 osds = osds[0:n]
540 self.log('Setting %s to %s' % (pgid, osds))
541 cmd = ['osd', 'pg-upmap', pgid] + [str(x) for x in osds]
542 self.log('cmd %s' % cmd)
543 self.ceph_manager.raw_cluster_cmd(*cmd)
544 else:
545 m = j['pg_upmap']
546 if len(m) > 0:
547 shuffle(m)
548 pg = m[0]['pgid']
549 self.log('Clearing pg_upmap on %s' % pg)
550 self.ceph_manager.raw_cluster_cmd(
551 'osd',
552 'rm-pg-upmap',
553 pg)
554 else:
555 self.log('No pg_upmap entries; doing nothing')
556 except CommandFailedError:
557 self.log('Failed to rm-pg-upmap, ignoring')
558
559 def thrash_pg_upmap_items(self):
560 """
561 Install or remove random pg_upmap_items entries in OSDMap
562 """
563 from random import shuffle
564 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
565 j = json.loads(out)
566 self.log('j is %s' % j)
567 try:
568 if random.random() >= .3:
569 pgs = self.ceph_manager.get_pg_stats()
570 pg = random.choice(pgs)
571 pgid = str(pg['pgid'])
572 poolid = int(pgid.split('.')[0])
573 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
574 if len(sizes) == 0:
575 return
576 n = sizes[0]
577 osds = self.in_osds + self.out_osds
578 shuffle(osds)
579 osds = osds[0:n*2]
580 self.log('Setting %s to %s' % (pgid, osds))
581 cmd = ['osd', 'pg-upmap-items', pgid] + [str(x) for x in osds]
582 self.log('cmd %s' % cmd)
583 self.ceph_manager.raw_cluster_cmd(*cmd)
584 else:
585 m = j['pg_upmap_items']
586 if len(m) > 0:
587 shuffle(m)
588 pg = m[0]['pgid']
589 self.log('Clearing pg_upmap on %s' % pg)
590 self.ceph_manager.raw_cluster_cmd(
591 'osd',
592 'rm-pg-upmap-items',
593 pg)
594 else:
595 self.log('No pg_upmap entries; doing nothing')
596 except CommandFailedError:
597 self.log('Failed to rm-pg-upmap-items, ignoring')
598
599 def all_up(self):
600 """
601 Make sure all osds are up and not out.
602 """
603 while len(self.dead_osds) > 0:
604 self.log("reviving osd")
605 self.revive_osd()
606 while len(self.out_osds) > 0:
607 self.log("inning osd")
608 self.in_osd()
609
610 def all_up_in(self):
611 """
612 Make sure all osds are up and fully in.
613 """
614 self.all_up();
615 for osd in self.live_osds:
616 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
617 str(osd), str(1))
618 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
619 str(osd), str(1))
620
621 def do_join(self):
622 """
623 Break out of this Ceph loop
624 """
625 self.stopping = True
626 self.thread.get()
627 if self.sighup_delay:
628 self.log("joining the do_sighup greenlet")
629 self.sighup_thread.get()
630 if self.optrack_toggle_delay:
631 self.log("joining the do_optrack_toggle greenlet")
632 self.optrack_toggle_thread.join()
633 if self.dump_ops_enable == "true":
634 self.log("joining the do_dump_ops greenlet")
635 self.dump_ops_thread.join()
636 if self.noscrub_toggle_delay:
637 self.log("joining the do_noscrub_toggle greenlet")
638 self.noscrub_toggle_thread.join()
639
640 def grow_pool(self):
641 """
642 Increase the size of the pool
643 """
644 pool = self.ceph_manager.get_pool()
645 orig_pg_num = self.ceph_manager.get_pool_pg_num(pool)
646 self.log("Growing pool %s" % (pool,))
647 if self.ceph_manager.expand_pool(pool,
648 self.config.get('pool_grow_by', 10),
649 self.max_pgs):
650 self.pools_to_fix_pgp_num.add(pool)
651
652 def fix_pgp_num(self, pool=None):
653 """
654 Fix number of pgs in pool.
655 """
656 if pool is None:
657 pool = self.ceph_manager.get_pool()
658 force = False
659 else:
660 force = True
661 self.log("fixing pg num pool %s" % (pool,))
662 if self.ceph_manager.set_pool_pgpnum(pool, force):
663 self.pools_to_fix_pgp_num.discard(pool)
664
665 def test_pool_min_size(self):
666 """
667 Kill and revive all osds except one.
668 """
669 self.log("test_pool_min_size")
670 self.all_up()
671 self.ceph_manager.wait_for_recovery(
672 timeout=self.config.get('timeout')
673 )
674 the_one = random.choice(self.in_osds)
675 self.log("Killing everyone but %s", the_one)
676 to_kill = filter(lambda x: x != the_one, self.in_osds)
677 [self.kill_osd(i) for i in to_kill]
678 [self.out_osd(i) for i in to_kill]
679 time.sleep(self.config.get("test_pool_min_size_time", 10))
680 self.log("Killing %s" % (the_one,))
681 self.kill_osd(the_one)
682 self.out_osd(the_one)
683 self.log("Reviving everyone but %s" % (the_one,))
684 [self.revive_osd(i) for i in to_kill]
685 [self.in_osd(i) for i in to_kill]
686 self.log("Revived everyone but %s" % (the_one,))
687 self.log("Waiting for clean")
688 self.ceph_manager.wait_for_recovery(
689 timeout=self.config.get('timeout')
690 )
691
692 def inject_pause(self, conf_key, duration, check_after, should_be_down):
693 """
694 Pause injection testing. Check for osd being down when finished.
695 """
696 the_one = random.choice(self.live_osds)
697 self.log("inject_pause on {osd}".format(osd=the_one))
698 self.log(
699 "Testing {key} pause injection for duration {duration}".format(
700 key=conf_key,
701 duration=duration
702 ))
703 self.log(
704 "Checking after {after}, should_be_down={shouldbedown}".format(
705 after=check_after,
706 shouldbedown=should_be_down
707 ))
708 self.ceph_manager.set_config(the_one, **{conf_key: duration})
709 if not should_be_down:
710 return
711 time.sleep(check_after)
712 status = self.ceph_manager.get_osd_status()
713 assert the_one in status['down']
714 time.sleep(duration - check_after + 20)
715 status = self.ceph_manager.get_osd_status()
716 assert not the_one in status['down']
717
718 def test_backfill_full(self):
719 """
720 Test backfills stopping when the replica fills up.
721
722 First, use injectfull admin command to simulate a now full
723 osd by setting it to 0 on all of the OSDs.
724
725 Second, on a random subset, set
726 osd_debug_skip_full_check_in_backfill_reservation to force
727 the more complicated check in do_scan to be exercised.
728
729 Then, verify that all backfills stop.
730 """
731 self.log("injecting backfill full")
732 for i in self.live_osds:
733 self.ceph_manager.set_config(
734 i,
735 osd_debug_skip_full_check_in_backfill_reservation=
736 random.choice(['false', 'true']))
737 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'backfillfull'],
738 check_status=True, timeout=30, stdout=DEVNULL)
739 for i in range(30):
740 status = self.ceph_manager.compile_pg_status()
741 if 'backfill' not in status.keys():
742 break
743 self.log(
744 "waiting for {still_going} backfills".format(
745 still_going=status.get('backfill')))
746 time.sleep(1)
747 assert('backfill' not in self.ceph_manager.compile_pg_status().keys())
748 for i in self.live_osds:
749 self.ceph_manager.set_config(
750 i,
751 osd_debug_skip_full_check_in_backfill_reservation='false')
752 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'none'],
753 check_status=True, timeout=30, stdout=DEVNULL)
754
755 def test_map_discontinuity(self):
756 """
757 1) Allows the osds to recover
758 2) kills an osd
759 3) allows the remaining osds to recover
760 4) waits for some time
761 5) revives the osd
762 This sequence should cause the revived osd to have to handle
763 a map gap since the mons would have trimmed
764 """
765 while len(self.in_osds) < (self.minin + 1):
766 self.in_osd()
767 self.log("Waiting for recovery")
768 self.ceph_manager.wait_for_all_up(
769 timeout=self.config.get('timeout')
770 )
771 # now we wait 20s for the pg status to change, if it takes longer,
772 # the test *should* fail!
773 time.sleep(20)
774 self.ceph_manager.wait_for_clean(
775 timeout=self.config.get('timeout')
776 )
777
778 # now we wait 20s for the backfill replicas to hear about the clean
779 time.sleep(20)
780 self.log("Recovered, killing an osd")
781 self.kill_osd(mark_down=True, mark_out=True)
782 self.log("Waiting for clean again")
783 self.ceph_manager.wait_for_clean(
784 timeout=self.config.get('timeout')
785 )
786 self.log("Waiting for trim")
787 time.sleep(int(self.config.get("map_discontinuity_sleep_time", 40)))
788 self.revive_osd()
789
790 def choose_action(self):
791 """
792 Random action selector.
793 """
794 chance_down = self.config.get('chance_down', 0.4)
795 chance_test_min_size = self.config.get('chance_test_min_size', 0)
796 chance_test_backfill_full = \
797 self.config.get('chance_test_backfill_full', 0)
798 if isinstance(chance_down, int):
799 chance_down = float(chance_down) / 100
800 minin = self.minin
801 minout = self.config.get("min_out", 0)
802 minlive = self.config.get("min_live", 2)
803 mindead = self.config.get("min_dead", 0)
804
805 self.log('choose_action: min_in %d min_out '
806 '%d min_live %d min_dead %d' %
807 (minin, minout, minlive, mindead))
808 actions = []
809 if len(self.in_osds) > minin:
810 actions.append((self.out_osd, 1.0,))
811 if len(self.live_osds) > minlive and chance_down > 0:
812 actions.append((self.kill_osd, chance_down,))
813 if len(self.dead_osds) > 1:
814 actions.append((self.rm_past_intervals, 1.0,))
815 if len(self.out_osds) > minout:
816 actions.append((self.in_osd, 1.7,))
817 if len(self.dead_osds) > mindead:
818 actions.append((self.revive_osd, 1.0,))
819 if self.config.get('thrash_primary_affinity', True):
820 actions.append((self.primary_affinity, 1.0,))
821 actions.append((self.reweight_osd_or_by_util,
822 self.config.get('reweight_osd', .5),))
823 actions.append((self.grow_pool,
824 self.config.get('chance_pgnum_grow', 0),))
825 actions.append((self.fix_pgp_num,
826 self.config.get('chance_pgpnum_fix', 0),))
827 actions.append((self.test_pool_min_size,
828 chance_test_min_size,))
829 actions.append((self.test_backfill_full,
830 chance_test_backfill_full,))
831 if self.chance_thrash_cluster_full > 0:
832 actions.append((self.thrash_cluster_full, self.chance_thrash_cluster_full,))
833 if self.chance_thrash_pg_upmap > 0:
834 actions.append((self.thrash_pg_upmap, self.chance_thrash_pg_upmap,))
835 if self.chance_thrash_pg_upmap_items > 0:
836 actions.append((self.thrash_pg_upmap_items, self.chance_thrash_pg_upmap_items,))
837
838 for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
839 for scenario in [
840 (lambda:
841 self.inject_pause(key,
842 self.config.get('pause_short', 3),
843 0,
844 False),
845 self.config.get('chance_inject_pause_short', 1),),
846 (lambda:
847 self.inject_pause(key,
848 self.config.get('pause_long', 80),
849 self.config.get('pause_check_after', 70),
850 True),
851 self.config.get('chance_inject_pause_long', 0),)]:
852 actions.append(scenario)
853
854 total = sum([y for (x, y) in actions])
855 val = random.uniform(0, total)
856 for (action, prob) in actions:
857 if val < prob:
858 return action
859 val -= prob
860 return None
861
862 def log_exc(func):
863 @wraps(func)
864 def wrapper(self):
865 try:
866 return func(self)
867 except:
868 self.log(traceback.format_exc())
869 raise
870 return wrapper
871
872 @log_exc
873 def do_sighup(self):
874 """
875 Loops and sends signal.SIGHUP to a random live osd.
876
877 Loop delay is controlled by the config value sighup_delay.
878 """
879 delay = float(self.sighup_delay)
880 self.log("starting do_sighup with a delay of {0}".format(delay))
881 while not self.stopping:
882 osd = random.choice(self.live_osds)
883 self.ceph_manager.signal_osd(osd, signal.SIGHUP, silent=True)
884 time.sleep(delay)
885
886 @log_exc
887 def do_optrack_toggle(self):
888 """
889 Loops and toggle op tracking to all osds.
890
891 Loop delay is controlled by the config value optrack_toggle_delay.
892 """
893 delay = float(self.optrack_toggle_delay)
894 osd_state = "true"
895 self.log("starting do_optrack_toggle with a delay of {0}".format(delay))
896 while not self.stopping:
897 if osd_state == "true":
898 osd_state = "false"
899 else:
900 osd_state = "true"
901 self.ceph_manager.raw_cluster_cmd_result('tell', 'osd.*',
902 'injectargs', '--osd_enable_op_tracker=%s' % osd_state)
903 gevent.sleep(delay)
904
905 @log_exc
906 def do_dump_ops(self):
907 """
908 Loops and does op dumps on all osds
909 """
910 self.log("starting do_dump_ops")
911 while not self.stopping:
912 for osd in self.live_osds:
913 # Ignore errors because live_osds is in flux
914 self.ceph_manager.osd_admin_socket(osd, command=['dump_ops_in_flight'],
915 check_status=False, timeout=30, stdout=DEVNULL)
916 self.ceph_manager.osd_admin_socket(osd, command=['dump_blocked_ops'],
917 check_status=False, timeout=30, stdout=DEVNULL)
918 self.ceph_manager.osd_admin_socket(osd, command=['dump_historic_ops'],
919 check_status=False, timeout=30, stdout=DEVNULL)
920 gevent.sleep(0)
921
922 @log_exc
923 def do_noscrub_toggle(self):
924 """
925 Loops and toggle noscrub flags
926
927 Loop delay is controlled by the config value noscrub_toggle_delay.
928 """
929 delay = float(self.noscrub_toggle_delay)
930 scrub_state = "none"
931 self.log("starting do_noscrub_toggle with a delay of {0}".format(delay))
932 while not self.stopping:
933 if scrub_state == "none":
934 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'noscrub')
935 scrub_state = "noscrub"
936 elif scrub_state == "noscrub":
937 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
938 scrub_state = "both"
939 elif scrub_state == "both":
940 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
941 scrub_state = "nodeep-scrub"
942 else:
943 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
944 scrub_state = "none"
945 gevent.sleep(delay)
946 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
947 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
948
949 @log_exc
950 def do_thrash(self):
951 """
952 Loop to select random actions to thrash ceph manager with.
953 """
954 cleanint = self.config.get("clean_interval", 60)
955 scrubint = self.config.get("scrub_interval", -1)
956 maxdead = self.config.get("max_dead", 0)
957 delay = self.config.get("op_delay", 5)
958 self.log("starting do_thrash")
959 while not self.stopping:
960 to_log = [str(x) for x in ["in_osds: ", self.in_osds,
961 "out_osds: ", self.out_osds,
962 "dead_osds: ", self.dead_osds,
963 "live_osds: ", self.live_osds]]
964 self.log(" ".join(to_log))
965 if random.uniform(0, 1) < (float(delay) / cleanint):
966 while len(self.dead_osds) > maxdead:
967 self.revive_osd()
968 for osd in self.in_osds:
969 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
970 str(osd), str(1))
971 if random.uniform(0, 1) < float(
972 self.config.get('chance_test_map_discontinuity', 0)):
973 self.test_map_discontinuity()
974 else:
975 self.ceph_manager.wait_for_recovery(
976 timeout=self.config.get('timeout')
977 )
978 time.sleep(self.clean_wait)
979 if scrubint > 0:
980 if random.uniform(0, 1) < (float(delay) / scrubint):
981 self.log('Scrubbing while thrashing being performed')
982 Scrubber(self.ceph_manager, self.config)
983 self.choose_action()()
984 time.sleep(delay)
985 for pool in list(self.pools_to_fix_pgp_num):
986 if self.ceph_manager.get_pool_pg_num(pool) > 0:
987 self.fix_pgp_num(pool)
988 self.pools_to_fix_pgp_num.clear()
989 for service, opt, saved_value in self.saved_options:
990 self._set_config(service, '*', opt, saved_value)
991 self.saved_options = []
992 self.all_up_in()
993
994
995 class ObjectStoreTool:
996
997 def __init__(self, manager, pool, **kwargs):
998 self.manager = manager
999 self.pool = pool
1000 self.osd = kwargs.get('osd', None)
1001 self.object_name = kwargs.get('object_name', None)
1002 self.do_revive = kwargs.get('do_revive', True)
1003 if self.osd and self.pool and self.object_name:
1004 if self.osd == "primary":
1005 self.osd = self.manager.get_object_primary(self.pool,
1006 self.object_name)
1007 assert self.osd
1008 if self.object_name:
1009 self.pgid = self.manager.get_object_pg_with_shard(self.pool,
1010 self.object_name,
1011 self.osd)
1012 self.remote = self.manager.ctx.\
1013 cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()[0]
1014 path = self.manager.get_filepath().format(id=self.osd)
1015 self.paths = ("--data-path {path} --journal-path {path}/journal".
1016 format(path=path))
1017
1018 def build_cmd(self, options, args, stdin):
1019 lines = []
1020 if self.object_name:
1021 lines.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1022 "{paths} --pgid {pgid} --op list |"
1023 "grep '\"oid\":\"{name}\"')".
1024 format(paths=self.paths,
1025 pgid=self.pgid,
1026 name=self.object_name))
1027 args = '"$object" ' + args
1028 options += " --pgid {pgid}".format(pgid=self.pgid)
1029 cmd = ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1030 format(paths=self.paths,
1031 args=args,
1032 options=options))
1033 if stdin:
1034 cmd = ("echo {payload} | base64 --decode | {cmd}".
1035 format(payload=base64.encode(stdin),
1036 cmd=cmd))
1037 lines.append(cmd)
1038 return "\n".join(lines)
1039
1040 def run(self, options, args, stdin=None, stdout=None):
1041 if stdout is None:
1042 stdout = StringIO()
1043 self.manager.kill_osd(self.osd)
1044 cmd = self.build_cmd(options, args, stdin)
1045 self.manager.log(cmd)
1046 try:
1047 proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
1048 check_status=False,
1049 stdout=stdout,
1050 stderr=StringIO())
1051 proc.wait()
1052 if proc.exitstatus != 0:
1053 self.manager.log("failed with " + str(proc.exitstatus))
1054 error = proc.stdout.getvalue() + " " + proc.stderr.getvalue()
1055 raise Exception(error)
1056 finally:
1057 if self.do_revive:
1058 self.manager.revive_osd(self.osd)
1059
1060
1061 class CephManager:
1062 """
1063 Ceph manager object.
1064 Contains several local functions that form a bulk of this module.
1065
1066 Note: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1067 the same name.
1068 """
1069
1070 REPLICATED_POOL = 1
1071 ERASURE_CODED_POOL = 3
1072
1073 def __init__(self, controller, ctx=None, config=None, logger=None,
1074 cluster='ceph'):
1075 self.lock = threading.RLock()
1076 self.ctx = ctx
1077 self.config = config
1078 self.controller = controller
1079 self.next_pool_id = 0
1080 self.cluster = cluster
1081 if (logger):
1082 self.log = lambda x: logger.info(x)
1083 else:
1084 def tmp(x):
1085 """
1086 implement log behavior.
1087 """
1088 print x
1089 self.log = tmp
1090 if self.config is None:
1091 self.config = dict()
1092 pools = self.list_pools()
1093 self.pools = {}
1094 for pool in pools:
1095 # we may race with a pool deletion; ignore failures here
1096 try:
1097 self.pools[pool] = self.get_pool_property(pool, 'pg_num')
1098 except CommandFailedError:
1099 self.log('Failed to get pg_num from pool %s, ignoring' % pool)
1100
1101 def raw_cluster_cmd(self, *args):
1102 """
1103 Start ceph on a raw cluster. Return count
1104 """
1105 testdir = teuthology.get_testdir(self.ctx)
1106 ceph_args = [
1107 'sudo',
1108 'adjust-ulimits',
1109 'ceph-coverage',
1110 '{tdir}/archive/coverage'.format(tdir=testdir),
1111 'timeout',
1112 '120',
1113 'ceph',
1114 '--cluster',
1115 self.cluster,
1116 ]
1117 ceph_args.extend(args)
1118 proc = self.controller.run(
1119 args=ceph_args,
1120 stdout=StringIO(),
1121 )
1122 return proc.stdout.getvalue()
1123
1124 def raw_cluster_cmd_result(self, *args):
1125 """
1126 Start ceph on a cluster. Return success or failure information.
1127 """
1128 testdir = teuthology.get_testdir(self.ctx)
1129 ceph_args = [
1130 'sudo',
1131 'adjust-ulimits',
1132 'ceph-coverage',
1133 '{tdir}/archive/coverage'.format(tdir=testdir),
1134 'timeout',
1135 '120',
1136 'ceph',
1137 '--cluster',
1138 self.cluster,
1139 ]
1140 ceph_args.extend(args)
1141 proc = self.controller.run(
1142 args=ceph_args,
1143 check_status=False,
1144 )
1145 return proc.exitstatus
1146
1147 def run_ceph_w(self):
1148 """
1149 Execute "ceph -w" in the background with stdout connected to a StringIO,
1150 and return the RemoteProcess.
1151 """
1152 return self.controller.run(
1153 args=["sudo",
1154 "daemon-helper",
1155 "kill",
1156 "ceph",
1157 '--cluster',
1158 self.cluster,
1159 "-w"],
1160 wait=False, stdout=StringIO(), stdin=run.PIPE)
1161
1162 def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=3*5):
1163 """
1164 Flush pg stats from a list of OSD ids, ensuring they are reflected
1165 all the way to the monitor. Luminous and later only.
1166
1167 :param osds: list of OSDs to flush
1168 :param no_wait: list of OSDs not to wait for seq id. by default, we
1169 wait for all specified osds, but some of them could be
1170 moved out of osdmap, so we cannot get their updated
1171 stat seq from monitor anymore. in that case, you need
1172 to pass a blacklist.
1173 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
1174 it. (3 * mon_mgr_digest_period, by default)
1175 """
1176 seq = {osd: self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats')
1177 for osd in osds}
1178 if not wait_for_mon:
1179 return
1180 if no_wait is None:
1181 no_wait = []
1182 for osd, need in seq.iteritems():
1183 if osd in no_wait:
1184 continue
1185 got = 0
1186 while wait_for_mon > 0:
1187 got = self.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd)
1188 self.log('need seq {need} got {got} for osd.{osd}'.format(
1189 need=need, got=got, osd=osd))
1190 if got >= need:
1191 break
1192 A_WHILE = 1
1193 time.sleep(A_WHILE)
1194 wait_for_mon -= A_WHILE
1195 else:
1196 raise Exception('timed out waiting for mon to be updated with '
1197 'osd.{osd}: {got} < {need}'.
1198 format(osd=osd, got=got, need=need))
1199
1200 def flush_all_pg_stats(self):
1201 self.flush_pg_stats(range(len(self.get_osd_dump())))
1202
1203 def do_rados(self, remote, cmd, check_status=True):
1204 """
1205 Execute a remote rados command.
1206 """
1207 testdir = teuthology.get_testdir(self.ctx)
1208 pre = [
1209 'adjust-ulimits',
1210 'ceph-coverage',
1211 '{tdir}/archive/coverage'.format(tdir=testdir),
1212 'rados',
1213 '--cluster',
1214 self.cluster,
1215 ]
1216 pre.extend(cmd)
1217 proc = remote.run(
1218 args=pre,
1219 wait=True,
1220 check_status=check_status
1221 )
1222 return proc
1223
1224 def rados_write_objects(self, pool, num_objects, size,
1225 timelimit, threads, cleanup=False):
1226 """
1227 Write rados objects
1228 Threads not used yet.
1229 """
1230 args = [
1231 '-p', pool,
1232 '--num-objects', num_objects,
1233 '-b', size,
1234 'bench', timelimit,
1235 'write'
1236 ]
1237 if not cleanup:
1238 args.append('--no-cleanup')
1239 return self.do_rados(self.controller, map(str, args))
1240
1241 def do_put(self, pool, obj, fname, namespace=None):
1242 """
1243 Implement rados put operation
1244 """
1245 args = ['-p', pool]
1246 if namespace is not None:
1247 args += ['-N', namespace]
1248 args += [
1249 'put',
1250 obj,
1251 fname
1252 ]
1253 return self.do_rados(
1254 self.controller,
1255 args,
1256 check_status=False
1257 ).exitstatus
1258
1259 def do_get(self, pool, obj, fname='/dev/null', namespace=None):
1260 """
1261 Implement rados get operation
1262 """
1263 args = ['-p', pool]
1264 if namespace is not None:
1265 args += ['-N', namespace]
1266 args += [
1267 'get',
1268 obj,
1269 fname
1270 ]
1271 return self.do_rados(
1272 self.controller,
1273 args,
1274 check_status=False
1275 ).exitstatus
1276
1277 def do_rm(self, pool, obj, namespace=None):
1278 """
1279 Implement rados rm operation
1280 """
1281 args = ['-p', pool]
1282 if namespace is not None:
1283 args += ['-N', namespace]
1284 args += [
1285 'rm',
1286 obj
1287 ]
1288 return self.do_rados(
1289 self.controller,
1290 args,
1291 check_status=False
1292 ).exitstatus
1293
1294 def osd_admin_socket(self, osd_id, command, check_status=True, timeout=0, stdout=None):
1295 if stdout is None:
1296 stdout = StringIO()
1297 return self.admin_socket('osd', osd_id, command, check_status, timeout, stdout)
1298
1299 def find_remote(self, service_type, service_id):
1300 """
1301 Get the Remote for the host where a particular service runs.
1302
1303 :param service_type: 'mds', 'osd', 'client'
1304 :param service_id: The second part of a role, e.g. '0' for
1305 the role 'client.0'
1306 :return: a Remote instance for the host where the
1307 requested role is placed
1308 """
1309 return get_remote(self.ctx, self.cluster,
1310 service_type, service_id)
1311
1312 def admin_socket(self, service_type, service_id,
1313 command, check_status=True, timeout=0, stdout=None):
1314 """
1315 Remotely start up ceph specifying the admin socket
1316 :param command: a list of words to use as the command
1317 to the admin socket
1318 """
1319 if stdout is None:
1320 stdout = StringIO()
1321 testdir = teuthology.get_testdir(self.ctx)
1322 remote = self.find_remote(service_type, service_id)
1323 args = [
1324 'sudo',
1325 'adjust-ulimits',
1326 'ceph-coverage',
1327 '{tdir}/archive/coverage'.format(tdir=testdir),
1328 'timeout',
1329 str(timeout),
1330 'ceph',
1331 '--cluster',
1332 self.cluster,
1333 '--admin-daemon',
1334 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1335 cluster=self.cluster,
1336 type=service_type,
1337 id=service_id),
1338 ]
1339 args.extend(command)
1340 return remote.run(
1341 args=args,
1342 stdout=stdout,
1343 wait=True,
1344 check_status=check_status
1345 )
1346
1347 def objectstore_tool(self, pool, options, args, **kwargs):
1348 return ObjectStoreTool(self, pool, **kwargs).run(options, args)
1349
1350 def get_pgid(self, pool, pgnum):
1351 """
1352 :param pool: pool name
1353 :param pgnum: pg number
1354 :returns: a string representing this pg.
1355 """
1356 poolnum = self.get_pool_num(pool)
1357 pg_str = "{poolnum}.{pgnum}".format(
1358 poolnum=poolnum,
1359 pgnum=pgnum)
1360 return pg_str
1361
1362 def get_pg_replica(self, pool, pgnum):
1363 """
1364 get replica for pool, pgnum (e.g. (data, 0)->0
1365 """
1366 pg_str = self.get_pgid(pool, pgnum)
1367 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1368 j = json.loads('\n'.join(output.split('\n')[1:]))
1369 return int(j['acting'][-1])
1370 assert False
1371
1372 def wait_for_pg_stats(func):
1373 # both osd_mon_report_interval_min and mgr_stats_period are 5 seconds
1374 # by default, and take the faulty injection in ms into consideration,
1375 # 12 seconds are more than enough
1376 delays = [1, 1, 2, 3, 5, 8, 13]
1377 @wraps(func)
1378 def wrapper(self, *args, **kwargs):
1379 exc = None
1380 for delay in delays:
1381 try:
1382 return func(self, *args, **kwargs)
1383 except AssertionError as e:
1384 time.sleep(delay)
1385 exc = e
1386 raise exc
1387 return wrapper
1388
1389 def get_pg_primary(self, pool, pgnum):
1390 """
1391 get primary for pool, pgnum (e.g. (data, 0)->0
1392 """
1393 pg_str = self.get_pgid(pool, pgnum)
1394 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1395 j = json.loads('\n'.join(output.split('\n')[1:]))
1396 return int(j['acting'][0])
1397 assert False
1398
1399 def get_pool_num(self, pool):
1400 """
1401 get number for pool (e.g., data -> 2)
1402 """
1403 return int(self.get_pool_dump(pool)['pool'])
1404
1405 def list_pools(self):
1406 """
1407 list all pool names
1408 """
1409 osd_dump = self.get_osd_dump_json()
1410 self.log(osd_dump['pools'])
1411 return [str(i['pool_name']) for i in osd_dump['pools']]
1412
1413 def clear_pools(self):
1414 """
1415 remove all pools
1416 """
1417 [self.remove_pool(i) for i in self.list_pools()]
1418
1419 def kick_recovery_wq(self, osdnum):
1420 """
1421 Run kick_recovery_wq on cluster.
1422 """
1423 return self.raw_cluster_cmd(
1424 'tell', "osd.%d" % (int(osdnum),),
1425 'debug',
1426 'kick_recovery_wq',
1427 '0')
1428
1429 def wait_run_admin_socket(self, service_type,
1430 service_id, args=['version'], timeout=75, stdout=None):
1431 """
1432 If osd_admin_socket call suceeds, return. Otherwise wait
1433 five seconds and try again.
1434 """
1435 if stdout is None:
1436 stdout = StringIO()
1437 tries = 0
1438 while True:
1439 proc = self.admin_socket(service_type, service_id,
1440 args, check_status=False, stdout=stdout)
1441 if proc.exitstatus is 0:
1442 return proc
1443 else:
1444 tries += 1
1445 if (tries * 5) > timeout:
1446 raise Exception('timed out waiting for admin_socket '
1447 'to appear after {type}.{id} restart'.
1448 format(type=service_type,
1449 id=service_id))
1450 self.log("waiting on admin_socket for {type}-{id}, "
1451 "{command}".format(type=service_type,
1452 id=service_id,
1453 command=args))
1454 time.sleep(5)
1455
1456 def get_pool_dump(self, pool):
1457 """
1458 get the osd dump part of a pool
1459 """
1460 osd_dump = self.get_osd_dump_json()
1461 for i in osd_dump['pools']:
1462 if i['pool_name'] == pool:
1463 return i
1464 assert False
1465
1466 def get_config(self, service_type, service_id, name):
1467 """
1468 :param node: like 'mon.a'
1469 :param name: the option name
1470 """
1471 proc = self.wait_run_admin_socket(service_type, service_id,
1472 ['config', 'show'])
1473 j = json.loads(proc.stdout.getvalue())
1474 return j[name]
1475
1476 def set_config(self, osdnum, **argdict):
1477 """
1478 :param osdnum: osd number
1479 :param argdict: dictionary containing values to set.
1480 """
1481 for k, v in argdict.iteritems():
1482 self.wait_run_admin_socket(
1483 'osd', osdnum,
1484 ['config', 'set', str(k), str(v)])
1485
1486 def raw_cluster_status(self):
1487 """
1488 Get status from cluster
1489 """
1490 status = self.raw_cluster_cmd('status', '--format=json-pretty')
1491 return json.loads(status)
1492
1493 def raw_osd_status(self):
1494 """
1495 Get osd status from cluster
1496 """
1497 return self.raw_cluster_cmd('osd', 'dump')
1498
1499 def get_osd_status(self):
1500 """
1501 Get osd statuses sorted by states that the osds are in.
1502 """
1503 osd_lines = filter(
1504 lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
1505 self.raw_osd_status().split('\n'))
1506 self.log(osd_lines)
1507 in_osds = [int(i[4:].split()[0])
1508 for i in filter(lambda x: " in " in x, osd_lines)]
1509 out_osds = [int(i[4:].split()[0])
1510 for i in filter(lambda x: " out " in x, osd_lines)]
1511 up_osds = [int(i[4:].split()[0])
1512 for i in filter(lambda x: " up " in x, osd_lines)]
1513 down_osds = [int(i[4:].split()[0])
1514 for i in filter(lambda x: " down " in x, osd_lines)]
1515 dead_osds = [int(x.id_)
1516 for x in filter(lambda x:
1517 not x.running(),
1518 self.ctx.daemons.
1519 iter_daemons_of_role('osd', self.cluster))]
1520 live_osds = [int(x.id_) for x in
1521 filter(lambda x:
1522 x.running(),
1523 self.ctx.daemons.iter_daemons_of_role('osd',
1524 self.cluster))]
1525 return {'in': in_osds, 'out': out_osds, 'up': up_osds,
1526 'down': down_osds, 'dead': dead_osds, 'live': live_osds,
1527 'raw': osd_lines}
1528
1529 def get_num_pgs(self):
1530 """
1531 Check cluster status for the number of pgs
1532 """
1533 status = self.raw_cluster_status()
1534 self.log(status)
1535 return status['pgmap']['num_pgs']
1536
1537 def create_erasure_code_profile(self, profile_name, profile):
1538 """
1539 Create an erasure code profile name that can be used as a parameter
1540 when creating an erasure coded pool.
1541 """
1542 with self.lock:
1543 args = cmd_erasure_code_profile(profile_name, profile)
1544 self.raw_cluster_cmd(*args)
1545
1546 def create_pool_with_unique_name(self, pg_num=16,
1547 erasure_code_profile_name=None,
1548 min_size=None,
1549 erasure_code_use_overwrites=False):
1550 """
1551 Create a pool named unique_pool_X where X is unique.
1552 """
1553 name = ""
1554 with self.lock:
1555 name = "unique_pool_%s" % (str(self.next_pool_id),)
1556 self.next_pool_id += 1
1557 self.create_pool(
1558 name,
1559 pg_num,
1560 erasure_code_profile_name=erasure_code_profile_name,
1561 min_size=min_size,
1562 erasure_code_use_overwrites=erasure_code_use_overwrites)
1563 return name
1564
1565 @contextlib.contextmanager
1566 def pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
1567 self.create_pool(pool_name, pg_num, erasure_code_profile_name)
1568 yield
1569 self.remove_pool(pool_name)
1570
1571 def create_pool(self, pool_name, pg_num=16,
1572 erasure_code_profile_name=None,
1573 min_size=None,
1574 erasure_code_use_overwrites=False):
1575 """
1576 Create a pool named from the pool_name parameter.
1577 :param pool_name: name of the pool being created.
1578 :param pg_num: initial number of pgs.
1579 :param erasure_code_profile_name: if set and !None create an
1580 erasure coded pool using the profile
1581 :param erasure_code_use_overwrites: if true, allow overwrites
1582 """
1583 with self.lock:
1584 assert isinstance(pool_name, basestring)
1585 assert isinstance(pg_num, int)
1586 assert pool_name not in self.pools
1587 self.log("creating pool_name %s" % (pool_name,))
1588 if erasure_code_profile_name:
1589 self.raw_cluster_cmd('osd', 'pool', 'create',
1590 pool_name, str(pg_num), str(pg_num),
1591 'erasure', erasure_code_profile_name)
1592 else:
1593 self.raw_cluster_cmd('osd', 'pool', 'create',
1594 pool_name, str(pg_num))
1595 if min_size is not None:
1596 self.raw_cluster_cmd(
1597 'osd', 'pool', 'set', pool_name,
1598 'min_size',
1599 str(min_size))
1600 if erasure_code_use_overwrites:
1601 self.raw_cluster_cmd(
1602 'osd', 'pool', 'set', pool_name,
1603 'allow_ec_overwrites',
1604 'true')
1605 self.pools[pool_name] = pg_num
1606 time.sleep(1)
1607
1608 def add_pool_snap(self, pool_name, snap_name):
1609 """
1610 Add pool snapshot
1611 :param pool_name: name of pool to snapshot
1612 :param snap_name: name of snapshot to take
1613 """
1614 self.raw_cluster_cmd('osd', 'pool', 'mksnap',
1615 str(pool_name), str(snap_name))
1616
1617 def remove_pool_snap(self, pool_name, snap_name):
1618 """
1619 Remove pool snapshot
1620 :param pool_name: name of pool to snapshot
1621 :param snap_name: name of snapshot to remove
1622 """
1623 self.raw_cluster_cmd('osd', 'pool', 'rmsnap',
1624 str(pool_name), str(snap_name))
1625
1626 def remove_pool(self, pool_name):
1627 """
1628 Remove the indicated pool
1629 :param pool_name: Pool to be removed
1630 """
1631 with self.lock:
1632 assert isinstance(pool_name, basestring)
1633 assert pool_name in self.pools
1634 self.log("removing pool_name %s" % (pool_name,))
1635 del self.pools[pool_name]
1636 self.do_rados(self.controller,
1637 ['rmpool', pool_name, pool_name,
1638 "--yes-i-really-really-mean-it"])
1639
1640 def get_pool(self):
1641 """
1642 Pick a random pool
1643 """
1644 with self.lock:
1645 return random.choice(self.pools.keys())
1646
1647 def get_pool_pg_num(self, pool_name):
1648 """
1649 Return the number of pgs in the pool specified.
1650 """
1651 with self.lock:
1652 assert isinstance(pool_name, basestring)
1653 if pool_name in self.pools:
1654 return self.pools[pool_name]
1655 return 0
1656
1657 def get_pool_property(self, pool_name, prop):
1658 """
1659 :param pool_name: pool
1660 :param prop: property to be checked.
1661 :returns: property as an int value.
1662 """
1663 with self.lock:
1664 assert isinstance(pool_name, basestring)
1665 assert isinstance(prop, basestring)
1666 output = self.raw_cluster_cmd(
1667 'osd',
1668 'pool',
1669 'get',
1670 pool_name,
1671 prop)
1672 return int(output.split()[1])
1673
1674 def set_pool_property(self, pool_name, prop, val):
1675 """
1676 :param pool_name: pool
1677 :param prop: property to be set.
1678 :param val: value to set.
1679
1680 This routine retries if set operation fails.
1681 """
1682 with self.lock:
1683 assert isinstance(pool_name, basestring)
1684 assert isinstance(prop, basestring)
1685 assert isinstance(val, int)
1686 tries = 0
1687 while True:
1688 r = self.raw_cluster_cmd_result(
1689 'osd',
1690 'pool',
1691 'set',
1692 pool_name,
1693 prop,
1694 str(val))
1695 if r != 11: # EAGAIN
1696 break
1697 tries += 1
1698 if tries > 50:
1699 raise Exception('timed out getting EAGAIN '
1700 'when setting pool property %s %s = %s' %
1701 (pool_name, prop, val))
1702 self.log('got EAGAIN setting pool property, '
1703 'waiting a few seconds...')
1704 time.sleep(2)
1705
1706 def expand_pool(self, pool_name, by, max_pgs):
1707 """
1708 Increase the number of pgs in a pool
1709 """
1710 with self.lock:
1711 assert isinstance(pool_name, basestring)
1712 assert isinstance(by, int)
1713 assert pool_name in self.pools
1714 if self.get_num_creating() > 0:
1715 return False
1716 if (self.pools[pool_name] + by) > max_pgs:
1717 return False
1718 self.log("increase pool size by %d" % (by,))
1719 new_pg_num = self.pools[pool_name] + by
1720 self.set_pool_property(pool_name, "pg_num", new_pg_num)
1721 self.pools[pool_name] = new_pg_num
1722 return True
1723
1724 def set_pool_pgpnum(self, pool_name, force):
1725 """
1726 Set pgpnum property of pool_name pool.
1727 """
1728 with self.lock:
1729 assert isinstance(pool_name, basestring)
1730 assert pool_name in self.pools
1731 if not force and self.get_num_creating() > 0:
1732 return False
1733 self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
1734 return True
1735
1736 def list_pg_missing(self, pgid):
1737 """
1738 return list of missing pgs with the id specified
1739 """
1740 r = None
1741 offset = {}
1742 while True:
1743 out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_missing',
1744 json.dumps(offset))
1745 j = json.loads(out)
1746 if r is None:
1747 r = j
1748 else:
1749 r['objects'].extend(j['objects'])
1750 if not 'more' in j:
1751 break
1752 if j['more'] == 0:
1753 break
1754 offset = j['objects'][-1]['oid']
1755 if 'more' in r:
1756 del r['more']
1757 return r
1758
1759 def get_pg_stats(self):
1760 """
1761 Dump the cluster and get pg stats
1762 """
1763 out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
1764 j = json.loads('\n'.join(out.split('\n')[1:]))
1765 return j['pg_stats']
1766
1767 def compile_pg_status(self):
1768 """
1769 Return a histogram of pg state values
1770 """
1771 ret = {}
1772 j = self.get_pg_stats()
1773 for pg in j:
1774 for status in pg['state'].split('+'):
1775 if status not in ret:
1776 ret[status] = 0
1777 ret[status] += 1
1778 return ret
1779
1780 @wait_for_pg_stats
1781 def with_pg_state(self, pool, pgnum, check):
1782 pgstr = self.get_pgid(pool, pgnum)
1783 stats = self.get_single_pg_stats(pgstr)
1784 assert(check(stats['state']))
1785
1786 @wait_for_pg_stats
1787 def with_pg(self, pool, pgnum, check):
1788 pgstr = self.get_pgid(pool, pgnum)
1789 stats = self.get_single_pg_stats(pgstr)
1790 return check(stats)
1791
1792 def get_last_scrub_stamp(self, pool, pgnum):
1793 """
1794 Get the timestamp of the last scrub.
1795 """
1796 stats = self.get_single_pg_stats(self.get_pgid(pool, pgnum))
1797 return stats["last_scrub_stamp"]
1798
1799 def do_pg_scrub(self, pool, pgnum, stype):
1800 """
1801 Scrub pg and wait for scrubbing to finish
1802 """
1803 init = self.get_last_scrub_stamp(pool, pgnum)
1804 RESEND_TIMEOUT = 120 # Must be a multiple of SLEEP_TIME
1805 FATAL_TIMEOUT = RESEND_TIMEOUT * 3
1806 SLEEP_TIME = 10
1807 timer = 0
1808 while init == self.get_last_scrub_stamp(pool, pgnum):
1809 assert timer < FATAL_TIMEOUT, "fatal timeout trying to " + stype
1810 self.log("waiting for scrub type %s" % (stype,))
1811 if (timer % RESEND_TIMEOUT) == 0:
1812 self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
1813 # The first time in this loop is the actual request
1814 if timer != 0 and stype == "repair":
1815 self.log("WARNING: Resubmitted a non-idempotent repair")
1816 time.sleep(SLEEP_TIME)
1817 timer += SLEEP_TIME
1818
1819 def wait_snap_trimming_complete(self, pool):
1820 """
1821 Wait for snap trimming on pool to end
1822 """
1823 POLL_PERIOD = 10
1824 FATAL_TIMEOUT = 600
1825 start = time.time()
1826 poolnum = self.get_pool_num(pool)
1827 poolnumstr = "%s." % (poolnum,)
1828 while (True):
1829 now = time.time()
1830 if (now - start) > FATAL_TIMEOUT:
1831 assert (now - start) < FATAL_TIMEOUT, \
1832 'failed to complete snap trimming before timeout'
1833 all_stats = self.get_pg_stats()
1834 trimming = False
1835 for pg in all_stats:
1836 if (poolnumstr in pg['pgid']) and ('snaptrim' in pg['state']):
1837 self.log("pg {pg} in trimming, state: {state}".format(
1838 pg=pg['pgid'],
1839 state=pg['state']))
1840 trimming = True
1841 if not trimming:
1842 break
1843 self.log("{pool} still trimming, waiting".format(pool=pool))
1844 time.sleep(POLL_PERIOD)
1845
1846 def get_single_pg_stats(self, pgid):
1847 """
1848 Return pg for the pgid specified.
1849 """
1850 all_stats = self.get_pg_stats()
1851
1852 for pg in all_stats:
1853 if pg['pgid'] == pgid:
1854 return pg
1855
1856 return None
1857
1858 def get_object_pg_with_shard(self, pool, name, osdid):
1859 """
1860 """
1861 pool_dump = self.get_pool_dump(pool)
1862 object_map = self.get_object_map(pool, name)
1863 if pool_dump["type"] == CephManager.ERASURE_CODED_POOL:
1864 shard = object_map['acting'].index(osdid)
1865 return "{pgid}s{shard}".format(pgid=object_map['pgid'],
1866 shard=shard)
1867 else:
1868 return object_map['pgid']
1869
1870 def get_object_primary(self, pool, name):
1871 """
1872 """
1873 object_map = self.get_object_map(pool, name)
1874 return object_map['acting_primary']
1875
1876 def get_object_map(self, pool, name):
1877 """
1878 osd map --format=json converted to a python object
1879 :returns: the python object
1880 """
1881 out = self.raw_cluster_cmd('--format=json', 'osd', 'map', pool, name)
1882 return json.loads('\n'.join(out.split('\n')[1:]))
1883
1884 def get_osd_dump_json(self):
1885 """
1886 osd dump --format=json converted to a python object
1887 :returns: the python object
1888 """
1889 out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
1890 return json.loads('\n'.join(out.split('\n')[1:]))
1891
1892 def get_osd_dump(self):
1893 """
1894 Dump osds
1895 :returns: all osds
1896 """
1897 return self.get_osd_dump_json()['osds']
1898
1899 def get_stuck_pgs(self, type_, threshold):
1900 """
1901 :returns: stuck pg information from the cluster
1902 """
1903 out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
1904 '--format=json')
1905 return json.loads(out)
1906
1907 def get_num_unfound_objects(self):
1908 """
1909 Check cluster status to get the number of unfound objects
1910 """
1911 status = self.raw_cluster_status()
1912 self.log(status)
1913 return status['pgmap'].get('unfound_objects', 0)
1914
1915 def get_num_creating(self):
1916 """
1917 Find the number of pgs in creating mode.
1918 """
1919 pgs = self.get_pg_stats()
1920 num = 0
1921 for pg in pgs:
1922 if 'creating' in pg['state']:
1923 num += 1
1924 return num
1925
1926 def get_num_active_clean(self):
1927 """
1928 Find the number of active and clean pgs.
1929 """
1930 pgs = self.get_pg_stats()
1931 num = 0
1932 for pg in pgs:
1933 if (pg['state'].count('active') and
1934 pg['state'].count('clean') and
1935 not pg['state'].count('stale')):
1936 num += 1
1937 return num
1938
1939 def get_num_active_recovered(self):
1940 """
1941 Find the number of active and recovered pgs.
1942 """
1943 pgs = self.get_pg_stats()
1944 num = 0
1945 for pg in pgs:
1946 if (pg['state'].count('active') and
1947 not pg['state'].count('recover') and
1948 not pg['state'].count('backfill') and
1949 not pg['state'].count('stale')):
1950 num += 1
1951 return num
1952
1953 def get_is_making_recovery_progress(self):
1954 """
1955 Return whether there is recovery progress discernable in the
1956 raw cluster status
1957 """
1958 status = self.raw_cluster_status()
1959 kps = status['pgmap'].get('recovering_keys_per_sec', 0)
1960 bps = status['pgmap'].get('recovering_bytes_per_sec', 0)
1961 ops = status['pgmap'].get('recovering_objects_per_sec', 0)
1962 return kps > 0 or bps > 0 or ops > 0
1963
1964 def get_num_active(self):
1965 """
1966 Find the number of active pgs.
1967 """
1968 pgs = self.get_pg_stats()
1969 num = 0
1970 for pg in pgs:
1971 if pg['state'].count('active') and not pg['state'].count('stale'):
1972 num += 1
1973 return num
1974
1975 def get_num_down(self):
1976 """
1977 Find the number of pgs that are down.
1978 """
1979 pgs = self.get_pg_stats()
1980 num = 0
1981 for pg in pgs:
1982 if ((pg['state'].count('down') and not
1983 pg['state'].count('stale')) or
1984 (pg['state'].count('incomplete') and not
1985 pg['state'].count('stale'))):
1986 num += 1
1987 return num
1988
1989 def get_num_active_down(self):
1990 """
1991 Find the number of pgs that are either active or down.
1992 """
1993 pgs = self.get_pg_stats()
1994 num = 0
1995 for pg in pgs:
1996 if ((pg['state'].count('active') and not
1997 pg['state'].count('stale')) or
1998 (pg['state'].count('down') and not
1999 pg['state'].count('stale')) or
2000 (pg['state'].count('incomplete') and not
2001 pg['state'].count('stale'))):
2002 num += 1
2003 return num
2004
2005 def is_clean(self):
2006 """
2007 True if all pgs are clean
2008 """
2009 return self.get_num_active_clean() == self.get_num_pgs()
2010
2011 def is_recovered(self):
2012 """
2013 True if all pgs have recovered
2014 """
2015 return self.get_num_active_recovered() == self.get_num_pgs()
2016
2017 def is_active_or_down(self):
2018 """
2019 True if all pgs are active or down
2020 """
2021 return self.get_num_active_down() == self.get_num_pgs()
2022
2023 def wait_for_clean(self, timeout=None):
2024 """
2025 Returns true when all pgs are clean.
2026 """
2027 self.log("waiting for clean")
2028 start = time.time()
2029 num_active_clean = self.get_num_active_clean()
2030 while not self.is_clean():
2031 if timeout is not None:
2032 if self.get_is_making_recovery_progress():
2033 self.log("making progress, resetting timeout")
2034 start = time.time()
2035 else:
2036 self.log("no progress seen, keeping timeout for now")
2037 if time.time() - start >= timeout:
2038 self.log('dumping pgs')
2039 out = self.raw_cluster_cmd('pg', 'dump')
2040 self.log(out)
2041 assert time.time() - start < timeout, \
2042 'failed to become clean before timeout expired'
2043 cur_active_clean = self.get_num_active_clean()
2044 if cur_active_clean != num_active_clean:
2045 start = time.time()
2046 num_active_clean = cur_active_clean
2047 time.sleep(3)
2048 self.log("clean!")
2049
2050 def are_all_osds_up(self):
2051 """
2052 Returns true if all osds are up.
2053 """
2054 x = self.get_osd_dump()
2055 return (len(x) == sum([(y['up'] > 0) for y in x]))
2056
2057 def wait_for_all_up(self, timeout=None):
2058 """
2059 When this exits, either the timeout has expired, or all
2060 osds are up.
2061 """
2062 self.log("waiting for all up")
2063 start = time.time()
2064 while not self.are_all_osds_up():
2065 if timeout is not None:
2066 assert time.time() - start < timeout, \
2067 'timeout expired in wait_for_all_up'
2068 time.sleep(3)
2069 self.log("all up!")
2070
2071 def wait_for_recovery(self, timeout=None):
2072 """
2073 Check peering. When this exists, we have recovered.
2074 """
2075 self.log("waiting for recovery to complete")
2076 start = time.time()
2077 num_active_recovered = self.get_num_active_recovered()
2078 while not self.is_recovered():
2079 now = time.time()
2080 if timeout is not None:
2081 if self.get_is_making_recovery_progress():
2082 self.log("making progress, resetting timeout")
2083 start = time.time()
2084 else:
2085 self.log("no progress seen, keeping timeout for now")
2086 if now - start >= timeout:
2087 self.log('dumping pgs')
2088 out = self.raw_cluster_cmd('pg', 'dump')
2089 self.log(out)
2090 assert now - start < timeout, \
2091 'failed to recover before timeout expired'
2092 cur_active_recovered = self.get_num_active_recovered()
2093 if cur_active_recovered != num_active_recovered:
2094 start = time.time()
2095 num_active_recovered = cur_active_recovered
2096 time.sleep(3)
2097 self.log("recovered!")
2098
2099 def wait_for_active(self, timeout=None):
2100 """
2101 Check peering. When this exists, we are definitely active
2102 """
2103 self.log("waiting for peering to complete")
2104 start = time.time()
2105 num_active = self.get_num_active()
2106 while not self.is_active():
2107 if timeout is not None:
2108 if time.time() - start >= timeout:
2109 self.log('dumping pgs')
2110 out = self.raw_cluster_cmd('pg', 'dump')
2111 self.log(out)
2112 assert time.time() - start < timeout, \
2113 'failed to recover before timeout expired'
2114 cur_active = self.get_num_active()
2115 if cur_active != num_active:
2116 start = time.time()
2117 num_active = cur_active
2118 time.sleep(3)
2119 self.log("active!")
2120
2121 def wait_for_active_or_down(self, timeout=None):
2122 """
2123 Check peering. When this exists, we are definitely either
2124 active or down
2125 """
2126 self.log("waiting for peering to complete or become blocked")
2127 start = time.time()
2128 num_active_down = self.get_num_active_down()
2129 while not self.is_active_or_down():
2130 if timeout is not None:
2131 if time.time() - start >= timeout:
2132 self.log('dumping pgs')
2133 out = self.raw_cluster_cmd('pg', 'dump')
2134 self.log(out)
2135 assert time.time() - start < timeout, \
2136 'failed to recover before timeout expired'
2137 cur_active_down = self.get_num_active_down()
2138 if cur_active_down != num_active_down:
2139 start = time.time()
2140 num_active_down = cur_active_down
2141 time.sleep(3)
2142 self.log("active or down!")
2143
2144 def osd_is_up(self, osd):
2145 """
2146 Wrapper for osd check
2147 """
2148 osds = self.get_osd_dump()
2149 return osds[osd]['up'] > 0
2150
2151 def wait_till_osd_is_up(self, osd, timeout=None):
2152 """
2153 Loop waiting for osd.
2154 """
2155 self.log('waiting for osd.%d to be up' % osd)
2156 start = time.time()
2157 while not self.osd_is_up(osd):
2158 if timeout is not None:
2159 assert time.time() - start < timeout, \
2160 'osd.%d failed to come up before timeout expired' % osd
2161 time.sleep(3)
2162 self.log('osd.%d is up' % osd)
2163
2164 def is_active(self):
2165 """
2166 Wrapper to check if all pgs are active
2167 """
2168 return self.get_num_active() == self.get_num_pgs()
2169
2170 def wait_till_active(self, timeout=None):
2171 """
2172 Wait until all pgs are active.
2173 """
2174 self.log("waiting till active")
2175 start = time.time()
2176 while not self.is_active():
2177 if timeout is not None:
2178 if time.time() - start >= timeout:
2179 self.log('dumping pgs')
2180 out = self.raw_cluster_cmd('pg', 'dump')
2181 self.log(out)
2182 assert time.time() - start < timeout, \
2183 'failed to become active before timeout expired'
2184 time.sleep(3)
2185 self.log("active!")
2186
2187 def mark_out_osd(self, osd):
2188 """
2189 Wrapper to mark osd out.
2190 """
2191 self.raw_cluster_cmd('osd', 'out', str(osd))
2192
2193 def kill_osd(self, osd):
2194 """
2195 Kill osds by either power cycling (if indicated by the config)
2196 or by stopping.
2197 """
2198 if self.config.get('powercycle'):
2199 remote = self.find_remote('osd', osd)
2200 self.log('kill_osd on osd.{o} '
2201 'doing powercycle of {s}'.format(o=osd, s=remote.name))
2202 self._assert_ipmi(remote)
2203 remote.console.power_off()
2204 elif self.config.get('bdev_inject_crash') and self.config.get('bdev_inject_crash_probability'):
2205 if random.uniform(0, 1) < self.config.get('bdev_inject_crash_probability', .5):
2206 self.raw_cluster_cmd(
2207 '--', 'tell', 'osd.%d' % osd,
2208 'injectargs',
2209 '--bdev-inject-crash %d' % self.config.get('bdev_inject_crash'),
2210 )
2211 try:
2212 self.ctx.daemons.get_daemon('osd', osd, self.cluster).wait()
2213 except:
2214 pass
2215 else:
2216 raise RuntimeError('osd.%s did not fail' % osd)
2217 else:
2218 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2219 else:
2220 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2221
2222 @staticmethod
2223 def _assert_ipmi(remote):
2224 assert remote.console.has_ipmi_credentials, (
2225 "powercycling requested but RemoteConsole is not "
2226 "initialized. Check ipmi config.")
2227
2228 def blackhole_kill_osd(self, osd):
2229 """
2230 Stop osd if nothing else works.
2231 """
2232 self.raw_cluster_cmd('--', 'tell', 'osd.%d' % osd,
2233 'injectargs',
2234 '--objectstore-blackhole')
2235 time.sleep(2)
2236 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2237
2238 def revive_osd(self, osd, timeout=150, skip_admin_check=False):
2239 """
2240 Revive osds by either power cycling (if indicated by the config)
2241 or by restarting.
2242 """
2243 if self.config.get('powercycle'):
2244 remote = self.find_remote('osd', osd)
2245 self.log('kill_osd on osd.{o} doing powercycle of {s}'.
2246 format(o=osd, s=remote.name))
2247 self._assert_ipmi(remote)
2248 remote.console.power_on()
2249 if not remote.console.check_status(300):
2250 raise Exception('Failed to revive osd.{o} via ipmi'.
2251 format(o=osd))
2252 teuthology.reconnect(self.ctx, 60, [remote])
2253 mount_osd_data(self.ctx, remote, self.cluster, str(osd))
2254 self.make_admin_daemon_dir(remote)
2255 self.ctx.daemons.get_daemon('osd', osd, self.cluster).reset()
2256 self.ctx.daemons.get_daemon('osd', osd, self.cluster).restart()
2257
2258 if not skip_admin_check:
2259 # wait for dump_ops_in_flight; this command doesn't appear
2260 # until after the signal handler is installed and it is safe
2261 # to stop the osd again without making valgrind leak checks
2262 # unhappy. see #5924.
2263 self.wait_run_admin_socket('osd', osd,
2264 args=['dump_ops_in_flight'],
2265 timeout=timeout, stdout=DEVNULL)
2266
2267 def mark_down_osd(self, osd):
2268 """
2269 Cluster command wrapper
2270 """
2271 self.raw_cluster_cmd('osd', 'down', str(osd))
2272
2273 def mark_in_osd(self, osd):
2274 """
2275 Cluster command wrapper
2276 """
2277 self.raw_cluster_cmd('osd', 'in', str(osd))
2278
2279 def signal_osd(self, osd, sig, silent=False):
2280 """
2281 Wrapper to local get_daemon call which sends the given
2282 signal to the given osd.
2283 """
2284 self.ctx.daemons.get_daemon('osd', osd,
2285 self.cluster).signal(sig, silent=silent)
2286
2287 ## monitors
2288 def signal_mon(self, mon, sig, silent=False):
2289 """
2290 Wrapper to local get_deamon call
2291 """
2292 self.ctx.daemons.get_daemon('mon', mon,
2293 self.cluster).signal(sig, silent=silent)
2294
2295 def kill_mon(self, mon):
2296 """
2297 Kill the monitor by either power cycling (if the config says so),
2298 or by doing a stop.
2299 """
2300 if self.config.get('powercycle'):
2301 remote = self.find_remote('mon', mon)
2302 self.log('kill_mon on mon.{m} doing powercycle of {s}'.
2303 format(m=mon, s=remote.name))
2304 self._assert_ipmi(remote)
2305 remote.console.power_off()
2306 else:
2307 self.ctx.daemons.get_daemon('mon', mon, self.cluster).stop()
2308
2309 def revive_mon(self, mon):
2310 """
2311 Restart by either power cycling (if the config says so),
2312 or by doing a normal restart.
2313 """
2314 if self.config.get('powercycle'):
2315 remote = self.find_remote('mon', mon)
2316 self.log('revive_mon on mon.{m} doing powercycle of {s}'.
2317 format(m=mon, s=remote.name))
2318 self._assert_ipmi(remote)
2319 remote.console.power_on()
2320 self.make_admin_daemon_dir(remote)
2321 self.ctx.daemons.get_daemon('mon', mon, self.cluster).restart()
2322
2323 def revive_mgr(self, mgr):
2324 """
2325 Restart by either power cycling (if the config says so),
2326 or by doing a normal restart.
2327 """
2328 if self.config.get('powercycle'):
2329 remote = self.find_remote('mgr', mgr)
2330 self.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
2331 format(m=mgr, s=remote.name))
2332 self._assert_ipmi(remote)
2333 remote.console.power_on()
2334 self.make_admin_daemon_dir(remote)
2335 self.ctx.daemons.get_daemon('mgr', mgr, self.cluster).restart()
2336
2337 def get_mon_status(self, mon):
2338 """
2339 Extract all the monitor status information from the cluster
2340 """
2341 addr = self.ctx.ceph[self.cluster].conf['mon.%s' % mon]['mon addr']
2342 out = self.raw_cluster_cmd('-m', addr, 'mon_status')
2343 return json.loads(out)
2344
2345 def get_mon_quorum(self):
2346 """
2347 Extract monitor quorum information from the cluster
2348 """
2349 out = self.raw_cluster_cmd('quorum_status')
2350 j = json.loads(out)
2351 self.log('quorum_status is %s' % out)
2352 return j['quorum']
2353
2354 def wait_for_mon_quorum_size(self, size, timeout=300):
2355 """
2356 Loop until quorum size is reached.
2357 """
2358 self.log('waiting for quorum size %d' % size)
2359 start = time.time()
2360 while not len(self.get_mon_quorum()) == size:
2361 if timeout is not None:
2362 assert time.time() - start < timeout, \
2363 ('failed to reach quorum size %d '
2364 'before timeout expired' % size)
2365 time.sleep(3)
2366 self.log("quorum is size %d" % size)
2367
2368 def get_mon_health(self, debug=False):
2369 """
2370 Extract all the monitor health information.
2371 """
2372 out = self.raw_cluster_cmd('health', '--format=json')
2373 if debug:
2374 self.log('health:\n{h}'.format(h=out))
2375 return json.loads(out)
2376
2377 def get_mds_status(self, mds):
2378 """
2379 Run cluster commands for the mds in order to get mds information
2380 """
2381 out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
2382 j = json.loads(' '.join(out.splitlines()[1:]))
2383 # collate; for dup ids, larger gid wins.
2384 for info in j['info'].itervalues():
2385 if info['name'] == mds:
2386 return info
2387 return None
2388
2389 def get_filepath(self):
2390 """
2391 Return path to osd data with {id} needing to be replaced
2392 """
2393 return '/var/lib/ceph/osd/' + self.cluster + '-{id}'
2394
2395 def make_admin_daemon_dir(self, remote):
2396 """
2397 Create /var/run/ceph directory on remote site.
2398
2399 :param ctx: Context
2400 :param remote: Remote site
2401 """
2402 remote.run(args=['sudo',
2403 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
2404
2405
2406 def utility_task(name):
2407 """
2408 Generate ceph_manager subtask corresponding to ceph_manager
2409 method name
2410 """
2411 def task(ctx, config):
2412 if config is None:
2413 config = {}
2414 args = config.get('args', [])
2415 kwargs = config.get('kwargs', {})
2416 cluster = config.get('cluster', 'ceph')
2417 fn = getattr(ctx.managers[cluster], name)
2418 fn(*args, **kwargs)
2419 return task
2420
2421 revive_osd = utility_task("revive_osd")
2422 revive_mon = utility_task("revive_mon")
2423 kill_osd = utility_task("kill_osd")
2424 kill_mon = utility_task("kill_mon")
2425 create_pool = utility_task("create_pool")
2426 remove_pool = utility_task("remove_pool")
2427 wait_for_clean = utility_task("wait_for_clean")
2428 set_pool_property = utility_task("set_pool_property")
2429 do_pg_scrub = utility_task("do_pg_scrub")