]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/ceph_manager.py
import 15.2.0 Octopus source
[ceph.git] / ceph / qa / tasks / ceph_manager.py
1 """
2 ceph manager -- Thrasher and CephManager objects
3 """
4 from functools import wraps
5 import contextlib
6 import random
7 import signal
8 import time
9 import gevent
10 import base64
11 import json
12 import logging
13 import threading
14 import traceback
15 import os
16 import six
17
18 from io import BytesIO
19 from teuthology import misc as teuthology
20 from tasks.scrub import Scrubber
21 from tasks.util.rados import cmd_erasure_code_profile
22 from tasks.util import get_remote
23 from teuthology.contextutil import safe_while
24 from teuthology.orchestra.remote import Remote
25 from teuthology.orchestra import run
26 from teuthology.exceptions import CommandFailedError
27 from tasks.thrasher import Thrasher
28 from six import StringIO
29
30 try:
31 from subprocess import DEVNULL # py3k
32 except ImportError:
33 DEVNULL = open(os.devnull, 'r+') # type: ignore
34
35 DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
36
37 log = logging.getLogger(__name__)
38
39 # this is for cephadm clusters
40 def shell(ctx, cluster_name, remote, args, name=None, **kwargs):
41 testdir = teuthology.get_testdir(ctx)
42 extra_args = []
43 if name:
44 extra_args = ['-n', name]
45 return remote.run(
46 args=[
47 'sudo',
48 ctx.cephadm,
49 '--image', ctx.ceph[cluster_name].image,
50 'shell',
51 ] + extra_args + [
52 '--fsid', ctx.ceph[cluster_name].fsid,
53 '--',
54 ] + args,
55 **kwargs
56 )
57
58 def write_conf(ctx, conf_path=DEFAULT_CONF_PATH, cluster='ceph'):
59 conf_fp = BytesIO()
60 ctx.ceph[cluster].conf.write(conf_fp)
61 conf_fp.seek(0)
62 writes = ctx.cluster.run(
63 args=[
64 'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
65 'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
66 'sudo', 'tee', conf_path, run.Raw('&&'),
67 'sudo', 'chmod', '0644', conf_path,
68 run.Raw('>'), '/dev/null',
69
70 ],
71 stdin=run.PIPE,
72 wait=False)
73 teuthology.feed_many_stdins_and_close(conf_fp, writes)
74 run.wait(writes)
75
76
77 def mount_osd_data(ctx, remote, cluster, osd):
78 """
79 Mount a remote OSD
80
81 :param ctx: Context
82 :param remote: Remote site
83 :param cluster: name of ceph cluster
84 :param osd: Osd name
85 """
86 log.debug('Mounting data for osd.{o} on {r}'.format(o=osd, r=remote))
87 role = "{0}.osd.{1}".format(cluster, osd)
88 alt_role = role if cluster != 'ceph' else "osd.{0}".format(osd)
89 if remote in ctx.disk_config.remote_to_roles_to_dev:
90 if alt_role in ctx.disk_config.remote_to_roles_to_dev[remote]:
91 role = alt_role
92 if role not in ctx.disk_config.remote_to_roles_to_dev[remote]:
93 return
94 dev = ctx.disk_config.remote_to_roles_to_dev[remote][role]
95 mount_options = ctx.disk_config.\
96 remote_to_roles_to_dev_mount_options[remote][role]
97 fstype = ctx.disk_config.remote_to_roles_to_dev_fstype[remote][role]
98 mnt = os.path.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster, osd))
99
100 log.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
101 'mountpoint: {p}, type: {t}, options: {v}'.format(
102 o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options,
103 c=cluster))
104
105 remote.run(
106 args=[
107 'sudo',
108 'mount',
109 '-t', fstype,
110 '-o', ','.join(mount_options),
111 dev,
112 mnt,
113 ]
114 )
115
116
117 def log_exc(func):
118 @wraps(func)
119 def wrapper(self):
120 try:
121 return func(self)
122 except:
123 self.log(traceback.format_exc())
124 raise
125 return wrapper
126
127
128 class PoolType:
129 REPLICATED = 1
130 ERASURE_CODED = 3
131
132
133 class OSDThrasher(Thrasher):
134 """
135 Object used to thrash Ceph
136 """
137 def __init__(self, manager, config, name, logger):
138 super(OSDThrasher, self).__init__()
139
140 self.ceph_manager = manager
141 self.cluster = manager.cluster
142 self.ceph_manager.wait_for_clean()
143 osd_status = self.ceph_manager.get_osd_status()
144 self.in_osds = osd_status['in']
145 self.live_osds = osd_status['live']
146 self.out_osds = osd_status['out']
147 self.dead_osds = osd_status['dead']
148 self.stopping = False
149 self.logger = logger
150 self.config = config
151 self.name = name
152 self.revive_timeout = self.config.get("revive_timeout", 360)
153 self.pools_to_fix_pgp_num = set()
154 if self.config.get('powercycle'):
155 self.revive_timeout += 120
156 self.clean_wait = self.config.get('clean_wait', 0)
157 self.minin = self.config.get("min_in", 4)
158 self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
159 self.sighup_delay = self.config.get('sighup_delay')
160 self.optrack_toggle_delay = self.config.get('optrack_toggle_delay')
161 self.dump_ops_enable = self.config.get('dump_ops_enable')
162 self.noscrub_toggle_delay = self.config.get('noscrub_toggle_delay')
163 self.chance_thrash_cluster_full = self.config.get('chance_thrash_cluster_full', .05)
164 self.chance_thrash_pg_upmap = self.config.get('chance_thrash_pg_upmap', 1.0)
165 self.chance_thrash_pg_upmap_items = self.config.get('chance_thrash_pg_upmap', 1.0)
166 self.random_eio = self.config.get('random_eio')
167 self.chance_force_recovery = self.config.get('chance_force_recovery', 0.3)
168
169 num_osds = self.in_osds + self.out_osds
170 self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * len(num_osds)
171 self.min_pgs = self.config.get("min_pgs_per_pool_osd", 1) * len(num_osds)
172 if self.config is None:
173 self.config = dict()
174 # prevent monitor from auto-marking things out while thrasher runs
175 # try both old and new tell syntax, in case we are testing old code
176 self.saved_options = []
177 # assuming that the default settings do not vary from one daemon to
178 # another
179 first_mon = teuthology.get_first_mon(manager.ctx, self.config).split('.')
180 opts = [('mon', 'mon_osd_down_out_interval', 0)]
181 #why do we disable marking an OSD out automatically? :/
182 for service, opt, new_value in opts:
183 old_value = manager.get_config(first_mon[0],
184 first_mon[1],
185 opt)
186 self.saved_options.append((service, opt, old_value))
187 manager.inject_args(service, '*', opt, new_value)
188 # initialize ceph_objectstore_tool property - must be done before
189 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
190 if (self.config.get('powercycle') or
191 not self.cmd_exists_on_osds("ceph-objectstore-tool") or
192 self.config.get('disable_objectstore_tool_tests', False)):
193 self.ceph_objectstore_tool = False
194 if self.config.get('powercycle'):
195 self.log("Unable to test ceph-objectstore-tool, "
196 "powercycle testing")
197 else:
198 self.log("Unable to test ceph-objectstore-tool, "
199 "not available on all OSD nodes")
200 else:
201 self.ceph_objectstore_tool = \
202 self.config.get('ceph_objectstore_tool', True)
203 # spawn do_thrash
204 self.thread = gevent.spawn(self.do_thrash)
205 if self.sighup_delay:
206 self.sighup_thread = gevent.spawn(self.do_sighup)
207 if self.optrack_toggle_delay:
208 self.optrack_toggle_thread = gevent.spawn(self.do_optrack_toggle)
209 if self.dump_ops_enable == "true":
210 self.dump_ops_thread = gevent.spawn(self.do_dump_ops)
211 if self.noscrub_toggle_delay:
212 self.noscrub_toggle_thread = gevent.spawn(self.do_noscrub_toggle)
213
214 def log(self, msg, *args, **kwargs):
215 self.logger.info(msg, *args, **kwargs)
216
217 def cmd_exists_on_osds(self, cmd):
218 if self.ceph_manager.cephadm:
219 return True
220 allremotes = self.ceph_manager.ctx.cluster.only(\
221 teuthology.is_type('osd', self.cluster)).remotes.keys()
222 allremotes = list(set(allremotes))
223 for remote in allremotes:
224 proc = remote.run(args=['type', cmd], wait=True,
225 check_status=False, stdout=BytesIO(),
226 stderr=BytesIO())
227 if proc.exitstatus != 0:
228 return False;
229 return True;
230
231 def run_ceph_objectstore_tool(self, remote, osd, cmd):
232 if self.ceph_manager.cephadm:
233 return shell(
234 self.ceph_manager.ctx, self.ceph_manager.cluster, remote,
235 args=['ceph-objectstore-tool'] + cmd,
236 name=osd,
237 wait=True, check_status=False,
238 stdout=StringIO(),
239 stderr=StringIO())
240 else:
241 return remote.run(
242 args=['sudo', 'adjust-ulimits', 'ceph-objectstore-tool'] + cmd,
243 wait=True, check_status=False,
244 stdout=StringIO(),
245 stderr=StringIO())
246
247 def kill_osd(self, osd=None, mark_down=False, mark_out=False):
248 """
249 :param osd: Osd to be killed.
250 :mark_down: Mark down if true.
251 :mark_out: Mark out if true.
252 """
253 if osd is None:
254 osd = random.choice(self.live_osds)
255 self.log("Killing osd %s, live_osds are %s" % (str(osd),
256 str(self.live_osds)))
257 self.live_osds.remove(osd)
258 self.dead_osds.append(osd)
259 self.ceph_manager.kill_osd(osd)
260 if mark_down:
261 self.ceph_manager.mark_down_osd(osd)
262 if mark_out and osd in self.in_osds:
263 self.out_osd(osd)
264 if self.ceph_objectstore_tool:
265 self.log("Testing ceph-objectstore-tool on down osd.%s" % osd)
266 remote = self.ceph_manager.find_remote('osd', osd)
267 FSPATH = self.ceph_manager.get_filepath()
268 JPATH = os.path.join(FSPATH, "journal")
269 exp_osd = imp_osd = osd
270 self.log('remote for osd %s is %s' % (osd, remote))
271 exp_remote = imp_remote = remote
272 # If an older osd is available we'll move a pg from there
273 if (len(self.dead_osds) > 1 and
274 random.random() < self.chance_move_pg):
275 exp_osd = random.choice(self.dead_osds[:-1])
276 exp_remote = self.ceph_manager.find_remote('osd', exp_osd)
277 self.log('remote for exp osd %s is %s' % (exp_osd, exp_remote))
278 prefix = [
279 '--no-mon-config',
280 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
281 ]
282
283 if not self.ceph_manager.cephadm:
284 # ceph-objectstore-tool might be temporarily absent during an
285 # upgrade - see http://tracker.ceph.com/issues/18014
286 with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
287 while proceed():
288 proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'],
289 wait=True, check_status=False, stdout=BytesIO(),
290 stderr=BytesIO())
291 if proc.exitstatus == 0:
292 break
293 log.debug("ceph-objectstore-tool binary not present, trying again")
294
295 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
296 # see http://tracker.ceph.com/issues/19556
297 with safe_while(sleep=15, tries=40, action="ceph-objectstore-tool --op list-pgs") as proceed:
298 while proceed():
299 proc = self.run_ceph_objectstore_tool(
300 exp_remote, 'osd.%s' % exp_osd,
301 prefix + [
302 '--data-path', FSPATH.format(id=exp_osd),
303 '--journal-path', JPATH.format(id=exp_osd),
304 '--op', 'list-pgs',
305 ])
306 if proc.exitstatus == 0:
307 break
308 elif (proc.exitstatus == 1 and
309 proc.stderr.getvalue() == "OSD has the store locked"):
310 continue
311 else:
312 raise Exception("ceph-objectstore-tool: "
313 "exp list-pgs failure with status {ret}".
314 format(ret=proc.exitstatus))
315
316 pgs = six.ensure_str(proc.stdout.getvalue()).split('\n')[:-1]
317 if len(pgs) == 0:
318 self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
319 return
320 pg = random.choice(pgs)
321 #exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
322 #exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
323 exp_path = os.path.join('/var/log/ceph', # available inside 'shell' container
324 "exp.{pg}.{id}".format(
325 pg=pg,
326 id=exp_osd))
327 if self.ceph_manager.cephadm:
328 exp_host_path = os.path.join(
329 '/var/log/ceph',
330 self.ceph_manager.ctx.ceph[self.ceph_manager.cluster].fsid,
331 "exp.{pg}.{id}".format(
332 pg=pg,
333 id=exp_osd))
334 else:
335 exp_host_path = exp_path
336
337 # export
338 # Can't use new export-remove op since this is part of upgrade testing
339 proc = self.run_ceph_objectstore_tool(
340 exp_remote, 'osd.%s' % exp_osd,
341 prefix + [
342 '--data-path', FSPATH.format(id=exp_osd),
343 '--journal-path', JPATH.format(id=exp_osd),
344 '--op', 'export',
345 '--pgid', pg,
346 '--file', exp_path,
347 ])
348 if proc.exitstatus:
349 raise Exception("ceph-objectstore-tool: "
350 "export failure with status {ret}".
351 format(ret=proc.exitstatus))
352 # remove
353 proc = self.run_ceph_objectstore_tool(
354 exp_remote, 'osd.%s' % exp_osd,
355 prefix + [
356 '--data-path', FSPATH.format(id=exp_osd),
357 '--journal-path', JPATH.format(id=exp_osd),
358 '--force',
359 '--op', 'remove',
360 '--pgid', pg,
361 ])
362 if proc.exitstatus:
363 raise Exception("ceph-objectstore-tool: "
364 "remove failure with status {ret}".
365 format(ret=proc.exitstatus))
366 # If there are at least 2 dead osds we might move the pg
367 if exp_osd != imp_osd:
368 # If pg isn't already on this osd, then we will move it there
369 proc = self.run_ceph_objectstore_tool(
370 imp_remote,
371 'osd.%s' % imp_osd,
372 prefix + [
373 '--data-path', FSPATH.format(id=imp_osd),
374 '--journal-path', JPATH.format(id=imp_osd),
375 '--op', 'list-pgs',
376 ])
377 if proc.exitstatus:
378 raise Exception("ceph-objectstore-tool: "
379 "imp list-pgs failure with status {ret}".
380 format(ret=proc.exitstatus))
381 pgs = six.ensure_str(proc.stdout.getvalue()).split('\n')[:-1]
382 if pg not in pgs:
383 self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
384 format(pg=pg, fosd=exp_osd, tosd=imp_osd))
385 if imp_remote != exp_remote:
386 # Copy export file to the other machine
387 self.log("Transfer export file from {srem} to {trem}".
388 format(srem=exp_remote, trem=imp_remote))
389 # just in case an upgrade make /var/log/ceph unreadable by non-root,
390 exp_remote.run(args=['sudo', 'chmod', '777',
391 '/var/log/ceph'])
392 imp_remote.run(args=['sudo', 'chmod', '777',
393 '/var/log/ceph'])
394 tmpexport = Remote.get_file(exp_remote, exp_host_path,
395 sudo=True)
396 if exp_host_path != exp_path:
397 # push to /var/log/ceph, then rename (we can't
398 # chmod 777 the /var/log/ceph/$fsid mountpoint)
399 Remote.put_file(imp_remote, tmpexport, exp_path)
400 imp_remote.run(args=[
401 'sudo', 'mv', exp_path, exp_host_path])
402 else:
403 Remote.put_file(imp_remote, tmpexport, exp_host_path)
404 os.remove(tmpexport)
405 else:
406 # Can't move the pg after all
407 imp_osd = exp_osd
408 imp_remote = exp_remote
409 # import
410 proc = self.run_ceph_objectstore_tool(
411 imp_remote, 'osd.%s' % imp_osd,
412 [
413 '--data-path', FSPATH.format(id=imp_osd),
414 '--journal-path', JPATH.format(id=imp_osd),
415 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
416 '--op', 'import',
417 '--file', exp_path,
418 ])
419 if proc.exitstatus == 1:
420 bogosity = "The OSD you are using is older than the exported PG"
421 if bogosity in proc.stderr.getvalue():
422 self.log("OSD older than exported PG"
423 "...ignored")
424 elif proc.exitstatus == 10:
425 self.log("Pool went away before processing an import"
426 "...ignored")
427 elif proc.exitstatus == 11:
428 self.log("Attempt to import an incompatible export"
429 "...ignored")
430 elif proc.exitstatus == 12:
431 # this should be safe to ignore because we only ever move 1
432 # copy of the pg at a time, and merge is only initiated when
433 # all replicas are peered and happy. /me crosses fingers
434 self.log("PG merged on target"
435 "...ignored")
436 elif proc.exitstatus:
437 raise Exception("ceph-objectstore-tool: "
438 "import failure with status {ret}".
439 format(ret=proc.exitstatus))
440 cmd = "sudo rm -f {file}".format(file=exp_host_path)
441 exp_remote.run(args=cmd)
442 if imp_remote != exp_remote:
443 imp_remote.run(args=cmd)
444
445 # apply low split settings to each pool
446 if not self.ceph_manager.cephadm:
447 for pool in self.ceph_manager.list_pools():
448 cmd = ("CEPH_ARGS='--filestore-merge-threshold 1 "
449 "--filestore-split-multiple 1' sudo -E "
450 + 'ceph-objectstore-tool '
451 + ' '.join(prefix + [
452 '--data-path', FSPATH.format(id=imp_osd),
453 '--journal-path', JPATH.format(id=imp_osd),
454 ])
455 + " --op apply-layout-settings --pool " + pool).format(id=osd)
456 proc = imp_remote.run(args=cmd,
457 wait=True, check_status=False,
458 stderr=StringIO())
459 if 'Couldn\'t find pool' in proc.stderr.getvalue():
460 continue
461 if proc.exitstatus:
462 raise Exception("ceph-objectstore-tool apply-layout-settings"
463 " failed with {status}".format(status=proc.exitstatus))
464
465
466 def blackhole_kill_osd(self, osd=None):
467 """
468 If all else fails, kill the osd.
469 :param osd: Osd to be killed.
470 """
471 if osd is None:
472 osd = random.choice(self.live_osds)
473 self.log("Blackholing and then killing osd %s, live_osds are %s" %
474 (str(osd), str(self.live_osds)))
475 self.live_osds.remove(osd)
476 self.dead_osds.append(osd)
477 self.ceph_manager.blackhole_kill_osd(osd)
478
479 def revive_osd(self, osd=None, skip_admin_check=False):
480 """
481 Revive the osd.
482 :param osd: Osd to be revived.
483 """
484 if osd is None:
485 osd = random.choice(self.dead_osds)
486 self.log("Reviving osd %s" % (str(osd),))
487 self.ceph_manager.revive_osd(
488 osd,
489 self.revive_timeout,
490 skip_admin_check=skip_admin_check)
491 self.dead_osds.remove(osd)
492 self.live_osds.append(osd)
493 if self.random_eio > 0 and osd == self.rerrosd:
494 self.ceph_manager.set_config(self.rerrosd,
495 filestore_debug_random_read_err = self.random_eio)
496 self.ceph_manager.set_config(self.rerrosd,
497 bluestore_debug_random_read_err = self.random_eio)
498
499
500 def out_osd(self, osd=None):
501 """
502 Mark the osd out
503 :param osd: Osd to be marked.
504 """
505 if osd is None:
506 osd = random.choice(self.in_osds)
507 self.log("Removing osd %s, in_osds are: %s" %
508 (str(osd), str(self.in_osds)))
509 self.ceph_manager.mark_out_osd(osd)
510 self.in_osds.remove(osd)
511 self.out_osds.append(osd)
512
513 def in_osd(self, osd=None):
514 """
515 Mark the osd out
516 :param osd: Osd to be marked.
517 """
518 if osd is None:
519 osd = random.choice(self.out_osds)
520 if osd in self.dead_osds:
521 return self.revive_osd(osd)
522 self.log("Adding osd %s" % (str(osd),))
523 self.out_osds.remove(osd)
524 self.in_osds.append(osd)
525 self.ceph_manager.mark_in_osd(osd)
526 self.log("Added osd %s" % (str(osd),))
527
528 def reweight_osd_or_by_util(self, osd=None):
529 """
530 Reweight an osd that is in
531 :param osd: Osd to be marked.
532 """
533 if osd is not None or random.choice([True, False]):
534 if osd is None:
535 osd = random.choice(self.in_osds)
536 val = random.uniform(.1, 1.0)
537 self.log("Reweighting osd %s to %s" % (str(osd), str(val)))
538 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
539 str(osd), str(val))
540 else:
541 # do it several times, the option space is large
542 for i in range(5):
543 options = {
544 'max_change': random.choice(['0.05', '1.0', '3.0']),
545 'overage': random.choice(['110', '1000']),
546 'type': random.choice([
547 'reweight-by-utilization',
548 'test-reweight-by-utilization']),
549 }
550 self.log("Reweighting by: %s"%(str(options),))
551 self.ceph_manager.raw_cluster_cmd(
552 'osd',
553 options['type'],
554 options['overage'],
555 options['max_change'])
556
557 def primary_affinity(self, osd=None):
558 if osd is None:
559 osd = random.choice(self.in_osds)
560 if random.random() >= .5:
561 pa = random.random()
562 elif random.random() >= .5:
563 pa = 1
564 else:
565 pa = 0
566 self.log('Setting osd %s primary_affinity to %f' % (str(osd), pa))
567 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
568 str(osd), str(pa))
569
570 def thrash_cluster_full(self):
571 """
572 Set and unset cluster full condition
573 """
574 self.log('Setting full ratio to .001')
575 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
576 time.sleep(1)
577 self.log('Setting full ratio back to .95')
578 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
579
580 def thrash_pg_upmap(self):
581 """
582 Install or remove random pg_upmap entries in OSDMap
583 """
584 from random import shuffle
585 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
586 j = json.loads(out)
587 self.log('j is %s' % j)
588 try:
589 if random.random() >= .3:
590 pgs = self.ceph_manager.get_pg_stats()
591 if not pgs:
592 return
593 pg = random.choice(pgs)
594 pgid = str(pg['pgid'])
595 poolid = int(pgid.split('.')[0])
596 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
597 if len(sizes) == 0:
598 return
599 n = sizes[0]
600 osds = self.in_osds + self.out_osds
601 shuffle(osds)
602 osds = osds[0:n]
603 self.log('Setting %s to %s' % (pgid, osds))
604 cmd = ['osd', 'pg-upmap', pgid] + [str(x) for x in osds]
605 self.log('cmd %s' % cmd)
606 self.ceph_manager.raw_cluster_cmd(*cmd)
607 else:
608 m = j['pg_upmap']
609 if len(m) > 0:
610 shuffle(m)
611 pg = m[0]['pgid']
612 self.log('Clearing pg_upmap on %s' % pg)
613 self.ceph_manager.raw_cluster_cmd(
614 'osd',
615 'rm-pg-upmap',
616 pg)
617 else:
618 self.log('No pg_upmap entries; doing nothing')
619 except CommandFailedError:
620 self.log('Failed to rm-pg-upmap, ignoring')
621
622 def thrash_pg_upmap_items(self):
623 """
624 Install or remove random pg_upmap_items entries in OSDMap
625 """
626 from random import shuffle
627 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
628 j = json.loads(out)
629 self.log('j is %s' % j)
630 try:
631 if random.random() >= .3:
632 pgs = self.ceph_manager.get_pg_stats()
633 if not pgs:
634 return
635 pg = random.choice(pgs)
636 pgid = str(pg['pgid'])
637 poolid = int(pgid.split('.')[0])
638 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
639 if len(sizes) == 0:
640 return
641 n = sizes[0]
642 osds = self.in_osds + self.out_osds
643 shuffle(osds)
644 osds = osds[0:n*2]
645 self.log('Setting %s to %s' % (pgid, osds))
646 cmd = ['osd', 'pg-upmap-items', pgid] + [str(x) for x in osds]
647 self.log('cmd %s' % cmd)
648 self.ceph_manager.raw_cluster_cmd(*cmd)
649 else:
650 m = j['pg_upmap_items']
651 if len(m) > 0:
652 shuffle(m)
653 pg = m[0]['pgid']
654 self.log('Clearing pg_upmap on %s' % pg)
655 self.ceph_manager.raw_cluster_cmd(
656 'osd',
657 'rm-pg-upmap-items',
658 pg)
659 else:
660 self.log('No pg_upmap entries; doing nothing')
661 except CommandFailedError:
662 self.log('Failed to rm-pg-upmap-items, ignoring')
663
664 def force_recovery(self):
665 """
666 Force recovery on some of PGs
667 """
668 backfill = random.random() >= 0.5
669 j = self.ceph_manager.get_pgids_to_force(backfill)
670 if j:
671 try:
672 if backfill:
673 self.ceph_manager.raw_cluster_cmd('pg', 'force-backfill', *j)
674 else:
675 self.ceph_manager.raw_cluster_cmd('pg', 'force-recovery', *j)
676 except CommandFailedError:
677 self.log('Failed to force backfill|recovery, ignoring')
678
679
680 def cancel_force_recovery(self):
681 """
682 Force recovery on some of PGs
683 """
684 backfill = random.random() >= 0.5
685 j = self.ceph_manager.get_pgids_to_cancel_force(backfill)
686 if j:
687 try:
688 if backfill:
689 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-backfill', *j)
690 else:
691 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-recovery', *j)
692 except CommandFailedError:
693 self.log('Failed to force backfill|recovery, ignoring')
694
695 def force_cancel_recovery(self):
696 """
697 Force or cancel forcing recovery
698 """
699 if random.random() >= 0.4:
700 self.force_recovery()
701 else:
702 self.cancel_force_recovery()
703
704 def all_up(self):
705 """
706 Make sure all osds are up and not out.
707 """
708 while len(self.dead_osds) > 0:
709 self.log("reviving osd")
710 self.revive_osd()
711 while len(self.out_osds) > 0:
712 self.log("inning osd")
713 self.in_osd()
714
715 def all_up_in(self):
716 """
717 Make sure all osds are up and fully in.
718 """
719 self.all_up();
720 for osd in self.live_osds:
721 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
722 str(osd), str(1))
723 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
724 str(osd), str(1))
725
726 def do_join(self):
727 """
728 Break out of this Ceph loop
729 """
730 self.stopping = True
731 self.thread.get()
732 if self.sighup_delay:
733 self.log("joining the do_sighup greenlet")
734 self.sighup_thread.get()
735 if self.optrack_toggle_delay:
736 self.log("joining the do_optrack_toggle greenlet")
737 self.optrack_toggle_thread.join()
738 if self.dump_ops_enable == "true":
739 self.log("joining the do_dump_ops greenlet")
740 self.dump_ops_thread.join()
741 if self.noscrub_toggle_delay:
742 self.log("joining the do_noscrub_toggle greenlet")
743 self.noscrub_toggle_thread.join()
744
745 def grow_pool(self):
746 """
747 Increase the size of the pool
748 """
749 pool = self.ceph_manager.get_pool()
750 if pool is None:
751 return
752 self.log("Growing pool %s" % (pool,))
753 if self.ceph_manager.expand_pool(pool,
754 self.config.get('pool_grow_by', 10),
755 self.max_pgs):
756 self.pools_to_fix_pgp_num.add(pool)
757
758 def shrink_pool(self):
759 """
760 Decrease the size of the pool
761 """
762 pool = self.ceph_manager.get_pool()
763 if pool is None:
764 return
765 _ = self.ceph_manager.get_pool_pg_num(pool)
766 self.log("Shrinking pool %s" % (pool,))
767 if self.ceph_manager.contract_pool(
768 pool,
769 self.config.get('pool_shrink_by', 10),
770 self.min_pgs):
771 self.pools_to_fix_pgp_num.add(pool)
772
773 def fix_pgp_num(self, pool=None):
774 """
775 Fix number of pgs in pool.
776 """
777 if pool is None:
778 pool = self.ceph_manager.get_pool()
779 if not pool:
780 return
781 force = False
782 else:
783 force = True
784 self.log("fixing pg num pool %s" % (pool,))
785 if self.ceph_manager.set_pool_pgpnum(pool, force):
786 self.pools_to_fix_pgp_num.discard(pool)
787
788 def test_pool_min_size(self):
789 """
790 Loop to selectively push PGs below their min_size and test that recovery
791 still occurs.
792 """
793 self.log("test_pool_min_size")
794 self.all_up()
795 self.ceph_manager.wait_for_recovery(
796 timeout=self.config.get('timeout')
797 )
798
799 minout = int(self.config.get("min_out", 1))
800 minlive = int(self.config.get("min_live", 2))
801 mindead = int(self.config.get("min_dead", 1))
802 self.log("doing min_size thrashing")
803 self.ceph_manager.wait_for_clean(timeout=60)
804 assert self.ceph_manager.is_clean(), \
805 'not clean before minsize thrashing starts'
806 while not self.stopping:
807 # look up k and m from all the pools on each loop, in case it
808 # changes as the cluster runs
809 k = 0
810 m = 99
811 has_pools = False
812 pools_json = self.ceph_manager.get_osd_dump_json()['pools']
813
814 for pool_json in pools_json:
815 pool = pool_json['pool_name']
816 has_pools = True
817 pool_type = pool_json['type'] # 1 for rep, 3 for ec
818 min_size = pool_json['min_size']
819 self.log("pool {pool} min_size is {min_size}".format(pool=pool,min_size=min_size))
820 try:
821 ec_profile = self.ceph_manager.get_pool_property(pool, 'erasure_code_profile')
822 if pool_type != PoolType.ERASURE_CODED:
823 continue
824 ec_profile = pool_json['erasure_code_profile']
825 ec_profile_json = self.ceph_manager.raw_cluster_cmd(
826 'osd',
827 'erasure-code-profile',
828 'get',
829 ec_profile,
830 '--format=json')
831 ec_json = json.loads(ec_profile_json)
832 local_k = int(ec_json['k'])
833 local_m = int(ec_json['m'])
834 self.log("pool {pool} local_k={k} local_m={m}".format(pool=pool,
835 k=local_k, m=local_m))
836 if local_k > k:
837 self.log("setting k={local_k} from previous {k}".format(local_k=local_k, k=k))
838 k = local_k
839 if local_m < m:
840 self.log("setting m={local_m} from previous {m}".format(local_m=local_m, m=m))
841 m = local_m
842 except CommandFailedError:
843 self.log("failed to read erasure_code_profile. %s was likely removed", pool)
844 continue
845
846 if has_pools :
847 self.log("using k={k}, m={m}".format(k=k,m=m))
848 else:
849 self.log("No pools yet, waiting")
850 time.sleep(5)
851 continue
852
853 if minout > len(self.out_osds): # kill OSDs and mark out
854 self.log("forced to out an osd")
855 self.kill_osd(mark_out=True)
856 continue
857 elif mindead > len(self.dead_osds): # kill OSDs but force timeout
858 self.log("forced to kill an osd")
859 self.kill_osd()
860 continue
861 else: # make mostly-random choice to kill or revive OSDs
862 minup = max(minlive, k)
863 rand_val = random.uniform(0, 1)
864 self.log("choosing based on number of live OSDs and rand val {rand}".\
865 format(rand=rand_val))
866 if len(self.live_osds) > minup+1 and rand_val < 0.5:
867 # chose to knock out as many OSDs as we can w/out downing PGs
868
869 most_killable = min(len(self.live_osds) - minup, m)
870 self.log("chose to kill {n} OSDs".format(n=most_killable))
871 for i in range(1, most_killable):
872 self.kill_osd(mark_out=True)
873 time.sleep(10)
874 # try a few times since there might be a concurrent pool
875 # creation or deletion
876 with safe_while(
877 sleep=5, tries=5,
878 action='check for active or peered') as proceed:
879 while proceed():
880 if self.ceph_manager.all_active_or_peered():
881 break
882 self.log('not all PGs are active or peered')
883 else: # chose to revive OSDs, bring up a random fraction of the dead ones
884 self.log("chose to revive osds")
885 for i in range(1, int(rand_val * len(self.dead_osds))):
886 self.revive_osd(i)
887
888 # let PGs repair themselves or our next knockout might kill one
889 self.ceph_manager.wait_for_clean(timeout=self.config.get('timeout'))
890
891 # / while not self.stopping
892 self.all_up_in()
893
894 self.ceph_manager.wait_for_recovery(
895 timeout=self.config.get('timeout')
896 )
897
898 def inject_pause(self, conf_key, duration, check_after, should_be_down):
899 """
900 Pause injection testing. Check for osd being down when finished.
901 """
902 the_one = random.choice(self.live_osds)
903 self.log("inject_pause on {osd}".format(osd=the_one))
904 self.log(
905 "Testing {key} pause injection for duration {duration}".format(
906 key=conf_key,
907 duration=duration
908 ))
909 self.log(
910 "Checking after {after}, should_be_down={shouldbedown}".format(
911 after=check_after,
912 shouldbedown=should_be_down
913 ))
914 self.ceph_manager.set_config(the_one, **{conf_key: duration})
915 if not should_be_down:
916 return
917 time.sleep(check_after)
918 status = self.ceph_manager.get_osd_status()
919 assert the_one in status['down']
920 time.sleep(duration - check_after + 20)
921 status = self.ceph_manager.get_osd_status()
922 assert not the_one in status['down']
923
924 def test_backfill_full(self):
925 """
926 Test backfills stopping when the replica fills up.
927
928 First, use injectfull admin command to simulate a now full
929 osd by setting it to 0 on all of the OSDs.
930
931 Second, on a random subset, set
932 osd_debug_skip_full_check_in_backfill_reservation to force
933 the more complicated check in do_scan to be exercised.
934
935 Then, verify that all backfillings stop.
936 """
937 self.log("injecting backfill full")
938 for i in self.live_osds:
939 self.ceph_manager.set_config(
940 i,
941 osd_debug_skip_full_check_in_backfill_reservation=
942 random.choice(['false', 'true']))
943 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'backfillfull'],
944 check_status=True, timeout=30, stdout=DEVNULL)
945 for i in range(30):
946 status = self.ceph_manager.compile_pg_status()
947 if 'backfilling' not in status.keys():
948 break
949 self.log(
950 "waiting for {still_going} backfillings".format(
951 still_going=status.get('backfilling')))
952 time.sleep(1)
953 assert('backfilling' not in self.ceph_manager.compile_pg_status().keys())
954 for i in self.live_osds:
955 self.ceph_manager.set_config(
956 i,
957 osd_debug_skip_full_check_in_backfill_reservation='false')
958 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'none'],
959 check_status=True, timeout=30, stdout=DEVNULL)
960
961 def test_map_discontinuity(self):
962 """
963 1) Allows the osds to recover
964 2) kills an osd
965 3) allows the remaining osds to recover
966 4) waits for some time
967 5) revives the osd
968 This sequence should cause the revived osd to have to handle
969 a map gap since the mons would have trimmed
970 """
971 while len(self.in_osds) < (self.minin + 1):
972 self.in_osd()
973 self.log("Waiting for recovery")
974 self.ceph_manager.wait_for_all_osds_up(
975 timeout=self.config.get('timeout')
976 )
977 # now we wait 20s for the pg status to change, if it takes longer,
978 # the test *should* fail!
979 time.sleep(20)
980 self.ceph_manager.wait_for_clean(
981 timeout=self.config.get('timeout')
982 )
983
984 # now we wait 20s for the backfill replicas to hear about the clean
985 time.sleep(20)
986 self.log("Recovered, killing an osd")
987 self.kill_osd(mark_down=True, mark_out=True)
988 self.log("Waiting for clean again")
989 self.ceph_manager.wait_for_clean(
990 timeout=self.config.get('timeout')
991 )
992 self.log("Waiting for trim")
993 time.sleep(int(self.config.get("map_discontinuity_sleep_time", 40)))
994 self.revive_osd()
995
996 def choose_action(self):
997 """
998 Random action selector.
999 """
1000 chance_down = self.config.get('chance_down', 0.4)
1001 _ = self.config.get('chance_test_min_size', 0)
1002 chance_test_backfill_full = \
1003 self.config.get('chance_test_backfill_full', 0)
1004 if isinstance(chance_down, int):
1005 chance_down = float(chance_down) / 100
1006 minin = self.minin
1007 minout = int(self.config.get("min_out", 0))
1008 minlive = int(self.config.get("min_live", 2))
1009 mindead = int(self.config.get("min_dead", 0))
1010
1011 self.log('choose_action: min_in %d min_out '
1012 '%d min_live %d min_dead %d' %
1013 (minin, minout, minlive, mindead))
1014 actions = []
1015 if len(self.in_osds) > minin:
1016 actions.append((self.out_osd, 1.0,))
1017 if len(self.live_osds) > minlive and chance_down > 0:
1018 actions.append((self.kill_osd, chance_down,))
1019 if len(self.out_osds) > minout:
1020 actions.append((self.in_osd, 1.7,))
1021 if len(self.dead_osds) > mindead:
1022 actions.append((self.revive_osd, 1.0,))
1023 if self.config.get('thrash_primary_affinity', True):
1024 actions.append((self.primary_affinity, 1.0,))
1025 actions.append((self.reweight_osd_or_by_util,
1026 self.config.get('reweight_osd', .5),))
1027 actions.append((self.grow_pool,
1028 self.config.get('chance_pgnum_grow', 0),))
1029 actions.append((self.shrink_pool,
1030 self.config.get('chance_pgnum_shrink', 0),))
1031 actions.append((self.fix_pgp_num,
1032 self.config.get('chance_pgpnum_fix', 0),))
1033 actions.append((self.test_pool_min_size,
1034 self.config.get('chance_test_min_size', 0),))
1035 actions.append((self.test_backfill_full,
1036 chance_test_backfill_full,))
1037 if self.chance_thrash_cluster_full > 0:
1038 actions.append((self.thrash_cluster_full, self.chance_thrash_cluster_full,))
1039 if self.chance_thrash_pg_upmap > 0:
1040 actions.append((self.thrash_pg_upmap, self.chance_thrash_pg_upmap,))
1041 if self.chance_thrash_pg_upmap_items > 0:
1042 actions.append((self.thrash_pg_upmap_items, self.chance_thrash_pg_upmap_items,))
1043 if self.chance_force_recovery > 0:
1044 actions.append((self.force_cancel_recovery, self.chance_force_recovery))
1045
1046 for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
1047 for scenario in [
1048 (lambda:
1049 self.inject_pause(key,
1050 self.config.get('pause_short', 3),
1051 0,
1052 False),
1053 self.config.get('chance_inject_pause_short', 1),),
1054 (lambda:
1055 self.inject_pause(key,
1056 self.config.get('pause_long', 80),
1057 self.config.get('pause_check_after', 70),
1058 True),
1059 self.config.get('chance_inject_pause_long', 0),)]:
1060 actions.append(scenario)
1061
1062 total = sum([y for (x, y) in actions])
1063 val = random.uniform(0, total)
1064 for (action, prob) in actions:
1065 if val < prob:
1066 return action
1067 val -= prob
1068 return None
1069
1070 def do_thrash(self):
1071 """
1072 _do_thrash() wrapper.
1073 """
1074 try:
1075 self._do_thrash()
1076 except Exception as e:
1077 # See _run exception comment for MDSThrasher
1078 self.set_thrasher_exception(e)
1079 self.logger.exception("exception:")
1080 # Allow successful completion so gevent doesn't see an exception.
1081 # The DaemonWatchdog will observe the error and tear down the test.
1082
1083 @log_exc
1084 def do_sighup(self):
1085 """
1086 Loops and sends signal.SIGHUP to a random live osd.
1087
1088 Loop delay is controlled by the config value sighup_delay.
1089 """
1090 delay = float(self.sighup_delay)
1091 self.log("starting do_sighup with a delay of {0}".format(delay))
1092 while not self.stopping:
1093 osd = random.choice(self.live_osds)
1094 self.ceph_manager.signal_osd(osd, signal.SIGHUP, silent=True)
1095 time.sleep(delay)
1096
1097 @log_exc
1098 def do_optrack_toggle(self):
1099 """
1100 Loops and toggle op tracking to all osds.
1101
1102 Loop delay is controlled by the config value optrack_toggle_delay.
1103 """
1104 delay = float(self.optrack_toggle_delay)
1105 osd_state = "true"
1106 self.log("starting do_optrack_toggle with a delay of {0}".format(delay))
1107 while not self.stopping:
1108 if osd_state == "true":
1109 osd_state = "false"
1110 else:
1111 osd_state = "true"
1112 try:
1113 self.ceph_manager.inject_args('osd', '*',
1114 'osd_enable_op_tracker',
1115 osd_state)
1116 except CommandFailedError:
1117 self.log('Failed to tell all osds, ignoring')
1118 gevent.sleep(delay)
1119
1120 @log_exc
1121 def do_dump_ops(self):
1122 """
1123 Loops and does op dumps on all osds
1124 """
1125 self.log("starting do_dump_ops")
1126 while not self.stopping:
1127 for osd in self.live_osds:
1128 # Ignore errors because live_osds is in flux
1129 self.ceph_manager.osd_admin_socket(osd, command=['dump_ops_in_flight'],
1130 check_status=False, timeout=30, stdout=DEVNULL)
1131 self.ceph_manager.osd_admin_socket(osd, command=['dump_blocked_ops'],
1132 check_status=False, timeout=30, stdout=DEVNULL)
1133 self.ceph_manager.osd_admin_socket(osd, command=['dump_historic_ops'],
1134 check_status=False, timeout=30, stdout=DEVNULL)
1135 gevent.sleep(0)
1136
1137 @log_exc
1138 def do_noscrub_toggle(self):
1139 """
1140 Loops and toggle noscrub flags
1141
1142 Loop delay is controlled by the config value noscrub_toggle_delay.
1143 """
1144 delay = float(self.noscrub_toggle_delay)
1145 scrub_state = "none"
1146 self.log("starting do_noscrub_toggle with a delay of {0}".format(delay))
1147 while not self.stopping:
1148 if scrub_state == "none":
1149 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'noscrub')
1150 scrub_state = "noscrub"
1151 elif scrub_state == "noscrub":
1152 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
1153 scrub_state = "both"
1154 elif scrub_state == "both":
1155 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
1156 scrub_state = "nodeep-scrub"
1157 else:
1158 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1159 scrub_state = "none"
1160 gevent.sleep(delay)
1161 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
1162 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1163
1164 @log_exc
1165 def _do_thrash(self):
1166 """
1167 Loop to select random actions to thrash ceph manager with.
1168 """
1169 cleanint = self.config.get("clean_interval", 60)
1170 scrubint = self.config.get("scrub_interval", -1)
1171 maxdead = self.config.get("max_dead", 0)
1172 delay = self.config.get("op_delay", 5)
1173 self.rerrosd = self.live_osds[0]
1174 if self.random_eio > 0:
1175 self.ceph_manager.inject_args('osd', self.rerrosd,
1176 'filestore_debug_random_read_err',
1177 self.random_eio)
1178 self.ceph_manager.inject_args('osd', self.rerrosd,
1179 'bluestore_debug_random_read_err',
1180 self.random_eio)
1181 self.log("starting do_thrash")
1182 while not self.stopping:
1183 to_log = [str(x) for x in ["in_osds: ", self.in_osds,
1184 "out_osds: ", self.out_osds,
1185 "dead_osds: ", self.dead_osds,
1186 "live_osds: ", self.live_osds]]
1187 self.log(" ".join(to_log))
1188 if random.uniform(0, 1) < (float(delay) / cleanint):
1189 while len(self.dead_osds) > maxdead:
1190 self.revive_osd()
1191 for osd in self.in_osds:
1192 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
1193 str(osd), str(1))
1194 if random.uniform(0, 1) < float(
1195 self.config.get('chance_test_map_discontinuity', 0)) \
1196 and len(self.live_osds) > 5: # avoid m=2,k=2 stall, w/ some buffer for crush being picky
1197 self.test_map_discontinuity()
1198 else:
1199 self.ceph_manager.wait_for_recovery(
1200 timeout=self.config.get('timeout')
1201 )
1202 time.sleep(self.clean_wait)
1203 if scrubint > 0:
1204 if random.uniform(0, 1) < (float(delay) / scrubint):
1205 self.log('Scrubbing while thrashing being performed')
1206 Scrubber(self.ceph_manager, self.config)
1207 self.choose_action()()
1208 time.sleep(delay)
1209 self.all_up()
1210 if self.random_eio > 0:
1211 self.ceph_manager.inject_args('osd', self.rerrosd,
1212 'filestore_debug_random_read_err', '0.0')
1213 self.ceph_manager.inject_args('osd', self.rerrosd,
1214 'bluestore_debug_random_read_err', '0.0')
1215 for pool in list(self.pools_to_fix_pgp_num):
1216 if self.ceph_manager.get_pool_pg_num(pool) > 0:
1217 self.fix_pgp_num(pool)
1218 self.pools_to_fix_pgp_num.clear()
1219 for service, opt, saved_value in self.saved_options:
1220 self.ceph_manager.inject_args(service, '*', opt, saved_value)
1221 self.saved_options = []
1222 self.all_up_in()
1223
1224
1225 class ObjectStoreTool:
1226
1227 def __init__(self, manager, pool, **kwargs):
1228 self.manager = manager
1229 self.pool = pool
1230 self.osd = kwargs.get('osd', None)
1231 self.object_name = kwargs.get('object_name', None)
1232 self.do_revive = kwargs.get('do_revive', True)
1233 if self.osd and self.pool and self.object_name:
1234 if self.osd == "primary":
1235 self.osd = self.manager.get_object_primary(self.pool,
1236 self.object_name)
1237 assert self.osd
1238 if self.object_name:
1239 self.pgid = self.manager.get_object_pg_with_shard(self.pool,
1240 self.object_name,
1241 self.osd)
1242 self.remote = self.manager.ctx.\
1243 cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()[0]
1244 path = self.manager.get_filepath().format(id=self.osd)
1245 self.paths = ("--data-path {path} --journal-path {path}/journal".
1246 format(path=path))
1247
1248 def build_cmd(self, options, args, stdin):
1249 lines = []
1250 if self.object_name:
1251 lines.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1252 "{paths} --pgid {pgid} --op list |"
1253 "grep '\"oid\":\"{name}\"')".
1254 format(paths=self.paths,
1255 pgid=self.pgid,
1256 name=self.object_name))
1257 args = '"$object" ' + args
1258 options += " --pgid {pgid}".format(pgid=self.pgid)
1259 cmd = ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1260 format(paths=self.paths,
1261 args=args,
1262 options=options))
1263 if stdin:
1264 cmd = ("echo {payload} | base64 --decode | {cmd}".
1265 format(payload=base64.encode(stdin),
1266 cmd=cmd))
1267 lines.append(cmd)
1268 return "\n".join(lines)
1269
1270 def run(self, options, args, stdin=None, stdout=None):
1271 if stdout is None:
1272 stdout = BytesIO()
1273 self.manager.kill_osd(self.osd)
1274 cmd = self.build_cmd(options, args, stdin)
1275 self.manager.log(cmd)
1276 try:
1277 proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
1278 check_status=False,
1279 stdout=stdout,
1280 stderr=BytesIO())
1281 proc.wait()
1282 if proc.exitstatus != 0:
1283 self.manager.log("failed with " + str(proc.exitstatus))
1284 error = six.ensure_str(proc.stdout.getvalue()) + " " + \
1285 six.ensure_str(proc.stderr.getvalue())
1286 raise Exception(error)
1287 finally:
1288 if self.do_revive:
1289 self.manager.revive_osd(self.osd)
1290 self.manager.wait_till_osd_is_up(self.osd, 300)
1291
1292
1293 # XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1294 # the same name.
1295 class CephManager:
1296 """
1297 Ceph manager object.
1298 Contains several local functions that form a bulk of this module.
1299
1300 :param controller: the remote machine where the Ceph commands should be
1301 executed
1302 :param ctx: the cluster context
1303 :param config: path to Ceph config file
1304 :param logger: for logging messages
1305 :param cluster: name of the Ceph cluster
1306 """
1307
1308 def __init__(self, controller, ctx=None, config=None, logger=None,
1309 cluster='ceph', cephadm=False):
1310 self.lock = threading.RLock()
1311 self.ctx = ctx
1312 self.config = config
1313 self.controller = controller
1314 self.next_pool_id = 0
1315 self.cluster = cluster
1316 self.cephadm = cephadm
1317 if (logger):
1318 self.log = lambda x: logger.info(x)
1319 else:
1320 def tmp(x):
1321 """
1322 implement log behavior.
1323 """
1324 print(x)
1325 self.log = tmp
1326 if self.config is None:
1327 self.config = dict()
1328 pools = self.list_pools()
1329 self.pools = {}
1330 for pool in pools:
1331 # we may race with a pool deletion; ignore failures here
1332 try:
1333 self.pools[pool] = self.get_pool_int_property(pool, 'pg_num')
1334 except CommandFailedError:
1335 self.log('Failed to get pg_num from pool %s, ignoring' % pool)
1336
1337 def raw_cluster_cmd(self, *args):
1338 """
1339 Start ceph on a raw cluster. Return count
1340 """
1341 if self.cephadm:
1342 proc = shell(self.ctx, self.cluster, self.controller,
1343 args=['ceph'] + list(args),
1344 stdout=BytesIO())
1345 else:
1346 testdir = teuthology.get_testdir(self.ctx)
1347 ceph_args = [
1348 'sudo',
1349 'adjust-ulimits',
1350 'ceph-coverage',
1351 '{tdir}/archive/coverage'.format(tdir=testdir),
1352 'timeout',
1353 '120',
1354 'ceph',
1355 '--cluster',
1356 self.cluster,
1357 '--log-early',
1358 ]
1359 ceph_args.extend(args)
1360 proc = self.controller.run(
1361 args=ceph_args,
1362 stdout=BytesIO(),
1363 )
1364 return six.ensure_str(proc.stdout.getvalue())
1365
1366 def raw_cluster_cmd_result(self, *args, **kwargs):
1367 """
1368 Start ceph on a cluster. Return success or failure information.
1369 """
1370 if self.cephadm:
1371 proc = shell(self.ctx, self.cluster, self.controller,
1372 args=['ceph'] + list(args),
1373 check_status=False)
1374 else:
1375 testdir = teuthology.get_testdir(self.ctx)
1376 ceph_args = [
1377 'sudo',
1378 'adjust-ulimits',
1379 'ceph-coverage',
1380 '{tdir}/archive/coverage'.format(tdir=testdir),
1381 'timeout',
1382 '120',
1383 'ceph',
1384 '--cluster',
1385 self.cluster,
1386 ]
1387 ceph_args.extend(args)
1388 kwargs['args'] = ceph_args
1389 kwargs['check_status'] = False
1390 proc = self.controller.run(**kwargs)
1391 return proc.exitstatus
1392
1393 def run_ceph_w(self, watch_channel=None):
1394 """
1395 Execute "ceph -w" in the background with stdout connected to a BytesIO,
1396 and return the RemoteProcess.
1397
1398 :param watch_channel: Specifies the channel to be watched. This can be
1399 'cluster', 'audit', ...
1400 :type watch_channel: str
1401 """
1402 args = ["sudo",
1403 "daemon-helper",
1404 "kill",
1405 "ceph",
1406 '--cluster',
1407 self.cluster,
1408 "-w"]
1409 if watch_channel is not None:
1410 args.append("--watch-channel")
1411 args.append(watch_channel)
1412 return self.controller.run(args=args, wait=False, stdout=BytesIO(), stdin=run.PIPE)
1413
1414 def get_mon_socks(self):
1415 """
1416 Get monitor sockets.
1417
1418 :return socks: tuple of strings; strings are individual sockets.
1419 """
1420 from json import loads
1421
1422 output = loads(self.raw_cluster_cmd(['--format=json', 'mon', 'dump']))
1423 socks = []
1424 for mon in output['mons']:
1425 for addrvec_mem in mon['public_addrs']['addrvec']:
1426 socks.append(addrvec_mem['addr'])
1427 return tuple(socks)
1428
1429 def get_msgrv1_mon_socks(self):
1430 """
1431 Get monitor sockets that use msgrv1 to operate.
1432
1433 :return socks: tuple of strings; strings are individual sockets.
1434 """
1435 from json import loads
1436
1437 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1438 socks = []
1439 for mon in output['mons']:
1440 for addrvec_mem in mon['public_addrs']['addrvec']:
1441 if addrvec_mem['type'] == 'v1':
1442 socks.append(addrvec_mem['addr'])
1443 return tuple(socks)
1444
1445 def get_msgrv2_mon_socks(self):
1446 """
1447 Get monitor sockets that use msgrv2 to operate.
1448
1449 :return socks: tuple of strings; strings are individual sockets.
1450 """
1451 from json import loads
1452
1453 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1454 socks = []
1455 for mon in output['mons']:
1456 for addrvec_mem in mon['public_addrs']['addrvec']:
1457 if addrvec_mem['type'] == 'v2':
1458 socks.append(addrvec_mem['addr'])
1459 return tuple(socks)
1460
1461 def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
1462 """
1463 Flush pg stats from a list of OSD ids, ensuring they are reflected
1464 all the way to the monitor. Luminous and later only.
1465
1466 :param osds: list of OSDs to flush
1467 :param no_wait: list of OSDs not to wait for seq id. by default, we
1468 wait for all specified osds, but some of them could be
1469 moved out of osdmap, so we cannot get their updated
1470 stat seq from monitor anymore. in that case, you need
1471 to pass a blacklist.
1472 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
1473 it. (5 min by default)
1474 """
1475 seq = {osd: int(self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats'))
1476 for osd in osds}
1477 if not wait_for_mon:
1478 return
1479 if no_wait is None:
1480 no_wait = []
1481 for osd, need in seq.items():
1482 if osd in no_wait:
1483 continue
1484 got = 0
1485 while wait_for_mon > 0:
1486 got = int(self.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd))
1487 self.log('need seq {need} got {got} for osd.{osd}'.format(
1488 need=need, got=got, osd=osd))
1489 if got >= need:
1490 break
1491 A_WHILE = 1
1492 time.sleep(A_WHILE)
1493 wait_for_mon -= A_WHILE
1494 else:
1495 raise Exception('timed out waiting for mon to be updated with '
1496 'osd.{osd}: {got} < {need}'.
1497 format(osd=osd, got=got, need=need))
1498
1499 def flush_all_pg_stats(self):
1500 self.flush_pg_stats(range(len(self.get_osd_dump())))
1501
1502 def do_rados(self, remote, cmd, check_status=True):
1503 """
1504 Execute a remote rados command.
1505 """
1506 testdir = teuthology.get_testdir(self.ctx)
1507 pre = [
1508 'adjust-ulimits',
1509 'ceph-coverage',
1510 '{tdir}/archive/coverage'.format(tdir=testdir),
1511 'rados',
1512 '--cluster',
1513 self.cluster,
1514 ]
1515 pre.extend(cmd)
1516 proc = remote.run(
1517 args=pre,
1518 wait=True,
1519 check_status=check_status
1520 )
1521 return proc
1522
1523 def rados_write_objects(self, pool, num_objects, size,
1524 timelimit, threads, cleanup=False):
1525 """
1526 Write rados objects
1527 Threads not used yet.
1528 """
1529 args = [
1530 '-p', pool,
1531 '--num-objects', num_objects,
1532 '-b', size,
1533 'bench', timelimit,
1534 'write'
1535 ]
1536 if not cleanup:
1537 args.append('--no-cleanup')
1538 return self.do_rados(self.controller, map(str, args))
1539
1540 def do_put(self, pool, obj, fname, namespace=None):
1541 """
1542 Implement rados put operation
1543 """
1544 args = ['-p', pool]
1545 if namespace is not None:
1546 args += ['-N', namespace]
1547 args += [
1548 'put',
1549 obj,
1550 fname
1551 ]
1552 return self.do_rados(
1553 self.controller,
1554 args,
1555 check_status=False
1556 ).exitstatus
1557
1558 def do_get(self, pool, obj, fname='/dev/null', namespace=None):
1559 """
1560 Implement rados get operation
1561 """
1562 args = ['-p', pool]
1563 if namespace is not None:
1564 args += ['-N', namespace]
1565 args += [
1566 'get',
1567 obj,
1568 fname
1569 ]
1570 return self.do_rados(
1571 self.controller,
1572 args,
1573 check_status=False
1574 ).exitstatus
1575
1576 def do_rm(self, pool, obj, namespace=None):
1577 """
1578 Implement rados rm operation
1579 """
1580 args = ['-p', pool]
1581 if namespace is not None:
1582 args += ['-N', namespace]
1583 args += [
1584 'rm',
1585 obj
1586 ]
1587 return self.do_rados(
1588 self.controller,
1589 args,
1590 check_status=False
1591 ).exitstatus
1592
1593 def osd_admin_socket(self, osd_id, command, check_status=True, timeout=0, stdout=None):
1594 if stdout is None:
1595 stdout = BytesIO()
1596 return self.admin_socket('osd', osd_id, command, check_status, timeout, stdout)
1597
1598 def find_remote(self, service_type, service_id):
1599 """
1600 Get the Remote for the host where a particular service runs.
1601
1602 :param service_type: 'mds', 'osd', 'client'
1603 :param service_id: The second part of a role, e.g. '0' for
1604 the role 'client.0'
1605 :return: a Remote instance for the host where the
1606 requested role is placed
1607 """
1608 return get_remote(self.ctx, self.cluster,
1609 service_type, service_id)
1610
1611 def admin_socket(self, service_type, service_id,
1612 command, check_status=True, timeout=0, stdout=None):
1613 """
1614 Remotely start up ceph specifying the admin socket
1615 :param command: a list of words to use as the command
1616 to the admin socket
1617 """
1618 if stdout is None:
1619 stdout = BytesIO()
1620
1621 remote = self.find_remote(service_type, service_id)
1622
1623 if self.cephadm:
1624 return shell(
1625 self.ctx, self.cluster, remote,
1626 args=[
1627 'ceph', 'daemon', '%s.%s' % (service_type, service_id),
1628 ] + command,
1629 stdout=stdout,
1630 wait=True,
1631 check_status=check_status,
1632 )
1633
1634 testdir = teuthology.get_testdir(self.ctx)
1635 args = [
1636 'sudo',
1637 'adjust-ulimits',
1638 'ceph-coverage',
1639 '{tdir}/archive/coverage'.format(tdir=testdir),
1640 'timeout',
1641 str(timeout),
1642 'ceph',
1643 '--cluster',
1644 self.cluster,
1645 '--admin-daemon',
1646 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1647 cluster=self.cluster,
1648 type=service_type,
1649 id=service_id),
1650 ]
1651 args.extend(command)
1652 return remote.run(
1653 args=args,
1654 stdout=stdout,
1655 wait=True,
1656 check_status=check_status
1657 )
1658
1659 def objectstore_tool(self, pool, options, args, **kwargs):
1660 return ObjectStoreTool(self, pool, **kwargs).run(options, args)
1661
1662 def get_pgid(self, pool, pgnum):
1663 """
1664 :param pool: pool name
1665 :param pgnum: pg number
1666 :returns: a string representing this pg.
1667 """
1668 poolnum = self.get_pool_num(pool)
1669 pg_str = "{poolnum}.{pgnum}".format(
1670 poolnum=poolnum,
1671 pgnum=pgnum)
1672 return pg_str
1673
1674 def get_pg_replica(self, pool, pgnum):
1675 """
1676 get replica for pool, pgnum (e.g. (data, 0)->0
1677 """
1678 pg_str = self.get_pgid(pool, pgnum)
1679 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1680 j = json.loads('\n'.join(output.split('\n')[1:]))
1681 return int(j['acting'][-1])
1682 assert False
1683
1684 def wait_for_pg_stats(func):
1685 # both osd_mon_report_interval and mgr_stats_period are 5 seconds
1686 # by default, and take the faulty injection in ms into consideration,
1687 # 12 seconds are more than enough
1688 delays = [1, 1, 2, 3, 5, 8, 13, 0]
1689 @wraps(func)
1690 def wrapper(self, *args, **kwargs):
1691 exc = None
1692 for delay in delays:
1693 try:
1694 return func(self, *args, **kwargs)
1695 except AssertionError as e:
1696 time.sleep(delay)
1697 exc = e
1698 raise exc
1699 return wrapper
1700
1701 def get_pg_primary(self, pool, pgnum):
1702 """
1703 get primary for pool, pgnum (e.g. (data, 0)->0
1704 """
1705 pg_str = self.get_pgid(pool, pgnum)
1706 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1707 j = json.loads('\n'.join(output.split('\n')[1:]))
1708 return int(j['acting'][0])
1709 assert False
1710
1711 def get_pool_num(self, pool):
1712 """
1713 get number for pool (e.g., data -> 2)
1714 """
1715 return int(self.get_pool_dump(pool)['pool'])
1716
1717 def list_pools(self):
1718 """
1719 list all pool names
1720 """
1721 osd_dump = self.get_osd_dump_json()
1722 self.log(osd_dump['pools'])
1723 return [str(i['pool_name']) for i in osd_dump['pools']]
1724
1725 def clear_pools(self):
1726 """
1727 remove all pools
1728 """
1729 [self.remove_pool(i) for i in self.list_pools()]
1730
1731 def kick_recovery_wq(self, osdnum):
1732 """
1733 Run kick_recovery_wq on cluster.
1734 """
1735 return self.raw_cluster_cmd(
1736 'tell', "osd.%d" % (int(osdnum),),
1737 'debug',
1738 'kick_recovery_wq',
1739 '0')
1740
1741 def wait_run_admin_socket(self, service_type,
1742 service_id, args=['version'], timeout=75, stdout=None):
1743 """
1744 If osd_admin_socket call succeeds, return. Otherwise wait
1745 five seconds and try again.
1746 """
1747 if stdout is None:
1748 stdout = BytesIO()
1749 tries = 0
1750 while True:
1751 proc = self.admin_socket(service_type, service_id,
1752 args, check_status=False, stdout=stdout)
1753 if proc.exitstatus == 0:
1754 return proc
1755 else:
1756 tries += 1
1757 if (tries * 5) > timeout:
1758 raise Exception('timed out waiting for admin_socket '
1759 'to appear after {type}.{id} restart'.
1760 format(type=service_type,
1761 id=service_id))
1762 self.log("waiting on admin_socket for {type}-{id}, "
1763 "{command}".format(type=service_type,
1764 id=service_id,
1765 command=args))
1766 time.sleep(5)
1767
1768 def get_pool_dump(self, pool):
1769 """
1770 get the osd dump part of a pool
1771 """
1772 osd_dump = self.get_osd_dump_json()
1773 for i in osd_dump['pools']:
1774 if i['pool_name'] == pool:
1775 return i
1776 assert False
1777
1778 def get_config(self, service_type, service_id, name):
1779 """
1780 :param node: like 'mon.a'
1781 :param name: the option name
1782 """
1783 proc = self.wait_run_admin_socket(service_type, service_id,
1784 ['config', 'show'])
1785 j = json.loads(proc.stdout.getvalue())
1786 return j[name]
1787
1788 def inject_args(self, service_type, service_id, name, value):
1789 whom = '{0}.{1}'.format(service_type, service_id)
1790 if isinstance(value, bool):
1791 value = 'true' if value else 'false'
1792 opt_arg = '--{name}={value}'.format(name=name, value=value)
1793 self.raw_cluster_cmd('--', 'tell', whom, 'injectargs', opt_arg)
1794
1795 def set_config(self, osdnum, **argdict):
1796 """
1797 :param osdnum: osd number
1798 :param argdict: dictionary containing values to set.
1799 """
1800 for k, v in argdict.items():
1801 self.wait_run_admin_socket(
1802 'osd', osdnum,
1803 ['config', 'set', str(k), str(v)])
1804
1805 def raw_cluster_status(self):
1806 """
1807 Get status from cluster
1808 """
1809 status = self.raw_cluster_cmd('status', '--format=json')
1810 return json.loads(status)
1811
1812 def raw_osd_status(self):
1813 """
1814 Get osd status from cluster
1815 """
1816 return self.raw_cluster_cmd('osd', 'dump')
1817
1818 def get_osd_status(self):
1819 """
1820 Get osd statuses sorted by states that the osds are in.
1821 """
1822 osd_lines = filter(
1823 lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
1824 self.raw_osd_status().split('\n'))
1825 self.log(osd_lines)
1826 in_osds = [int(i[4:].split()[0])
1827 for i in filter(lambda x: " in " in x, osd_lines)]
1828 out_osds = [int(i[4:].split()[0])
1829 for i in filter(lambda x: " out " in x, osd_lines)]
1830 up_osds = [int(i[4:].split()[0])
1831 for i in filter(lambda x: " up " in x, osd_lines)]
1832 down_osds = [int(i[4:].split()[0])
1833 for i in filter(lambda x: " down " in x, osd_lines)]
1834 dead_osds = [int(x.id_)
1835 for x in filter(lambda x:
1836 not x.running(),
1837 self.ctx.daemons.
1838 iter_daemons_of_role('osd', self.cluster))]
1839 live_osds = [int(x.id_) for x in
1840 filter(lambda x:
1841 x.running(),
1842 self.ctx.daemons.iter_daemons_of_role('osd',
1843 self.cluster))]
1844 return {'in': in_osds, 'out': out_osds, 'up': up_osds,
1845 'down': down_osds, 'dead': dead_osds, 'live': live_osds,
1846 'raw': osd_lines}
1847
1848 def get_num_pgs(self):
1849 """
1850 Check cluster status for the number of pgs
1851 """
1852 status = self.raw_cluster_status()
1853 self.log(status)
1854 return status['pgmap']['num_pgs']
1855
1856 def create_erasure_code_profile(self, profile_name, profile):
1857 """
1858 Create an erasure code profile name that can be used as a parameter
1859 when creating an erasure coded pool.
1860 """
1861 with self.lock:
1862 args = cmd_erasure_code_profile(profile_name, profile)
1863 self.raw_cluster_cmd(*args)
1864
1865 def create_pool_with_unique_name(self, pg_num=16,
1866 erasure_code_profile_name=None,
1867 min_size=None,
1868 erasure_code_use_overwrites=False):
1869 """
1870 Create a pool named unique_pool_X where X is unique.
1871 """
1872 name = ""
1873 with self.lock:
1874 name = "unique_pool_%s" % (str(self.next_pool_id),)
1875 self.next_pool_id += 1
1876 self.create_pool(
1877 name,
1878 pg_num,
1879 erasure_code_profile_name=erasure_code_profile_name,
1880 min_size=min_size,
1881 erasure_code_use_overwrites=erasure_code_use_overwrites)
1882 return name
1883
1884 @contextlib.contextmanager
1885 def pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
1886 self.create_pool(pool_name, pg_num, erasure_code_profile_name)
1887 yield
1888 self.remove_pool(pool_name)
1889
1890 def create_pool(self, pool_name, pg_num=16,
1891 erasure_code_profile_name=None,
1892 min_size=None,
1893 erasure_code_use_overwrites=False):
1894 """
1895 Create a pool named from the pool_name parameter.
1896 :param pool_name: name of the pool being created.
1897 :param pg_num: initial number of pgs.
1898 :param erasure_code_profile_name: if set and !None create an
1899 erasure coded pool using the profile
1900 :param erasure_code_use_overwrites: if true, allow overwrites
1901 """
1902 with self.lock:
1903 assert isinstance(pool_name, six.string_types)
1904 assert isinstance(pg_num, int)
1905 assert pool_name not in self.pools
1906 self.log("creating pool_name %s" % (pool_name,))
1907 if erasure_code_profile_name:
1908 self.raw_cluster_cmd('osd', 'pool', 'create',
1909 pool_name, str(pg_num), str(pg_num),
1910 'erasure', erasure_code_profile_name)
1911 else:
1912 self.raw_cluster_cmd('osd', 'pool', 'create',
1913 pool_name, str(pg_num))
1914 if min_size is not None:
1915 self.raw_cluster_cmd(
1916 'osd', 'pool', 'set', pool_name,
1917 'min_size',
1918 str(min_size))
1919 if erasure_code_use_overwrites:
1920 self.raw_cluster_cmd(
1921 'osd', 'pool', 'set', pool_name,
1922 'allow_ec_overwrites',
1923 'true')
1924 self.raw_cluster_cmd(
1925 'osd', 'pool', 'application', 'enable',
1926 pool_name, 'rados', '--yes-i-really-mean-it',
1927 run.Raw('||'), 'true')
1928 self.pools[pool_name] = pg_num
1929 time.sleep(1)
1930
1931 def add_pool_snap(self, pool_name, snap_name):
1932 """
1933 Add pool snapshot
1934 :param pool_name: name of pool to snapshot
1935 :param snap_name: name of snapshot to take
1936 """
1937 self.raw_cluster_cmd('osd', 'pool', 'mksnap',
1938 str(pool_name), str(snap_name))
1939
1940 def remove_pool_snap(self, pool_name, snap_name):
1941 """
1942 Remove pool snapshot
1943 :param pool_name: name of pool to snapshot
1944 :param snap_name: name of snapshot to remove
1945 """
1946 self.raw_cluster_cmd('osd', 'pool', 'rmsnap',
1947 str(pool_name), str(snap_name))
1948
1949 def remove_pool(self, pool_name):
1950 """
1951 Remove the indicated pool
1952 :param pool_name: Pool to be removed
1953 """
1954 with self.lock:
1955 assert isinstance(pool_name, six.string_types)
1956 assert pool_name in self.pools
1957 self.log("removing pool_name %s" % (pool_name,))
1958 del self.pools[pool_name]
1959 self.raw_cluster_cmd('osd', 'pool', 'rm', pool_name, pool_name,
1960 "--yes-i-really-really-mean-it")
1961
1962 def get_pool(self):
1963 """
1964 Pick a random pool
1965 """
1966 with self.lock:
1967 if self.pools:
1968 return random.choice(self.pools.keys())
1969
1970 def get_pool_pg_num(self, pool_name):
1971 """
1972 Return the number of pgs in the pool specified.
1973 """
1974 with self.lock:
1975 assert isinstance(pool_name, six.string_types)
1976 if pool_name in self.pools:
1977 return self.pools[pool_name]
1978 return 0
1979
1980 def get_pool_property(self, pool_name, prop):
1981 """
1982 :param pool_name: pool
1983 :param prop: property to be checked.
1984 :returns: property as string
1985 """
1986 with self.lock:
1987 assert isinstance(pool_name, six.string_types)
1988 assert isinstance(prop, six.string_types)
1989 output = self.raw_cluster_cmd(
1990 'osd',
1991 'pool',
1992 'get',
1993 pool_name,
1994 prop)
1995 return output.split()[1]
1996
1997 def get_pool_int_property(self, pool_name, prop):
1998 return int(self.get_pool_property(pool_name, prop))
1999
2000 def set_pool_property(self, pool_name, prop, val):
2001 """
2002 :param pool_name: pool
2003 :param prop: property to be set.
2004 :param val: value to set.
2005
2006 This routine retries if set operation fails.
2007 """
2008 with self.lock:
2009 assert isinstance(pool_name, six.string_types)
2010 assert isinstance(prop, six.string_types)
2011 assert isinstance(val, int)
2012 tries = 0
2013 while True:
2014 r = self.raw_cluster_cmd_result(
2015 'osd',
2016 'pool',
2017 'set',
2018 pool_name,
2019 prop,
2020 str(val))
2021 if r != 11: # EAGAIN
2022 break
2023 tries += 1
2024 if tries > 50:
2025 raise Exception('timed out getting EAGAIN '
2026 'when setting pool property %s %s = %s' %
2027 (pool_name, prop, val))
2028 self.log('got EAGAIN setting pool property, '
2029 'waiting a few seconds...')
2030 time.sleep(2)
2031
2032 def expand_pool(self, pool_name, by, max_pgs):
2033 """
2034 Increase the number of pgs in a pool
2035 """
2036 with self.lock:
2037 assert isinstance(pool_name, six.string_types)
2038 assert isinstance(by, int)
2039 assert pool_name in self.pools
2040 if self.get_num_creating() > 0:
2041 return False
2042 if (self.pools[pool_name] + by) > max_pgs:
2043 return False
2044 self.log("increase pool size by %d" % (by,))
2045 new_pg_num = self.pools[pool_name] + by
2046 self.set_pool_property(pool_name, "pg_num", new_pg_num)
2047 self.pools[pool_name] = new_pg_num
2048 return True
2049
2050 def contract_pool(self, pool_name, by, min_pgs):
2051 """
2052 Decrease the number of pgs in a pool
2053 """
2054 with self.lock:
2055 self.log('contract_pool %s by %s min %s' % (
2056 pool_name, str(by), str(min_pgs)))
2057 assert isinstance(pool_name, six.string_types)
2058 assert isinstance(by, int)
2059 assert pool_name in self.pools
2060 if self.get_num_creating() > 0:
2061 self.log('too many creating')
2062 return False
2063 proj = self.pools[pool_name] - by
2064 if proj < min_pgs:
2065 self.log('would drop below min_pgs, proj %d, currently %d' % (proj,self.pools[pool_name],))
2066 return False
2067 self.log("decrease pool size by %d" % (by,))
2068 new_pg_num = self.pools[pool_name] - by
2069 self.set_pool_property(pool_name, "pg_num", new_pg_num)
2070 self.pools[pool_name] = new_pg_num
2071 return True
2072
2073 def stop_pg_num_changes(self):
2074 """
2075 Reset all pg_num_targets back to pg_num, canceling splits and merges
2076 """
2077 self.log('Canceling any pending splits or merges...')
2078 osd_dump = self.get_osd_dump_json()
2079 try:
2080 for pool in osd_dump['pools']:
2081 if pool['pg_num'] != pool['pg_num_target']:
2082 self.log('Setting pool %s (%d) pg_num %d -> %d' %
2083 (pool['pool_name'], pool['pool'],
2084 pool['pg_num_target'],
2085 pool['pg_num']))
2086 self.raw_cluster_cmd('osd', 'pool', 'set', pool['pool_name'],
2087 'pg_num', str(pool['pg_num']))
2088 except KeyError:
2089 # we don't support pg_num_target before nautilus
2090 pass
2091
2092 def set_pool_pgpnum(self, pool_name, force):
2093 """
2094 Set pgpnum property of pool_name pool.
2095 """
2096 with self.lock:
2097 assert isinstance(pool_name, six.string_types)
2098 assert pool_name in self.pools
2099 if not force and self.get_num_creating() > 0:
2100 return False
2101 self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
2102 return True
2103
2104 def list_pg_unfound(self, pgid):
2105 """
2106 return list of unfound pgs with the id specified
2107 """
2108 r = None
2109 offset = {}
2110 while True:
2111 out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_unfound',
2112 json.dumps(offset))
2113 j = json.loads(out)
2114 if r is None:
2115 r = j
2116 else:
2117 r['objects'].extend(j['objects'])
2118 if not 'more' in j:
2119 break
2120 if j['more'] == 0:
2121 break
2122 offset = j['objects'][-1]['oid']
2123 if 'more' in r:
2124 del r['more']
2125 return r
2126
2127 def get_pg_stats(self):
2128 """
2129 Dump the cluster and get pg stats
2130 """
2131 out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
2132 j = json.loads('\n'.join(out.split('\n')[1:]))
2133 try:
2134 return j['pg_map']['pg_stats']
2135 except KeyError:
2136 return j['pg_stats']
2137
2138 def get_pgids_to_force(self, backfill):
2139 """
2140 Return the randomized list of PGs that can have their recovery/backfill forced
2141 """
2142 j = self.get_pg_stats();
2143 pgids = []
2144 if backfill:
2145 wanted = ['degraded', 'backfilling', 'backfill_wait']
2146 else:
2147 wanted = ['recovering', 'degraded', 'recovery_wait']
2148 for pg in j:
2149 status = pg['state'].split('+')
2150 for t in wanted:
2151 if random.random() > 0.5 and not ('forced_backfill' in status or 'forced_recovery' in status) and t in status:
2152 pgids.append(pg['pgid'])
2153 break
2154 return pgids
2155
2156 def get_pgids_to_cancel_force(self, backfill):
2157 """
2158 Return the randomized list of PGs whose recovery/backfill priority is forced
2159 """
2160 j = self.get_pg_stats();
2161 pgids = []
2162 if backfill:
2163 wanted = 'forced_backfill'
2164 else:
2165 wanted = 'forced_recovery'
2166 for pg in j:
2167 status = pg['state'].split('+')
2168 if wanted in status and random.random() > 0.5:
2169 pgids.append(pg['pgid'])
2170 return pgids
2171
2172 def compile_pg_status(self):
2173 """
2174 Return a histogram of pg state values
2175 """
2176 ret = {}
2177 j = self.get_pg_stats()
2178 for pg in j:
2179 for status in pg['state'].split('+'):
2180 if status not in ret:
2181 ret[status] = 0
2182 ret[status] += 1
2183 return ret
2184
2185 @wait_for_pg_stats # type: ignore
2186 def with_pg_state(self, pool, pgnum, check):
2187 pgstr = self.get_pgid(pool, pgnum)
2188 stats = self.get_single_pg_stats(pgstr)
2189 assert(check(stats['state']))
2190
2191 @wait_for_pg_stats # type: ignore
2192 def with_pg(self, pool, pgnum, check):
2193 pgstr = self.get_pgid(pool, pgnum)
2194 stats = self.get_single_pg_stats(pgstr)
2195 return check(stats)
2196
2197 def get_last_scrub_stamp(self, pool, pgnum):
2198 """
2199 Get the timestamp of the last scrub.
2200 """
2201 stats = self.get_single_pg_stats(self.get_pgid(pool, pgnum))
2202 return stats["last_scrub_stamp"]
2203
2204 def do_pg_scrub(self, pool, pgnum, stype):
2205 """
2206 Scrub pg and wait for scrubbing to finish
2207 """
2208 init = self.get_last_scrub_stamp(pool, pgnum)
2209 RESEND_TIMEOUT = 120 # Must be a multiple of SLEEP_TIME
2210 FATAL_TIMEOUT = RESEND_TIMEOUT * 3
2211 SLEEP_TIME = 10
2212 timer = 0
2213 while init == self.get_last_scrub_stamp(pool, pgnum):
2214 assert timer < FATAL_TIMEOUT, "fatal timeout trying to " + stype
2215 self.log("waiting for scrub type %s" % (stype,))
2216 if (timer % RESEND_TIMEOUT) == 0:
2217 self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
2218 # The first time in this loop is the actual request
2219 if timer != 0 and stype == "repair":
2220 self.log("WARNING: Resubmitted a non-idempotent repair")
2221 time.sleep(SLEEP_TIME)
2222 timer += SLEEP_TIME
2223
2224 def wait_snap_trimming_complete(self, pool):
2225 """
2226 Wait for snap trimming on pool to end
2227 """
2228 POLL_PERIOD = 10
2229 FATAL_TIMEOUT = 600
2230 start = time.time()
2231 poolnum = self.get_pool_num(pool)
2232 poolnumstr = "%s." % (poolnum,)
2233 while (True):
2234 now = time.time()
2235 if (now - start) > FATAL_TIMEOUT:
2236 assert (now - start) < FATAL_TIMEOUT, \
2237 'failed to complete snap trimming before timeout'
2238 all_stats = self.get_pg_stats()
2239 trimming = False
2240 for pg in all_stats:
2241 if (poolnumstr in pg['pgid']) and ('snaptrim' in pg['state']):
2242 self.log("pg {pg} in trimming, state: {state}".format(
2243 pg=pg['pgid'],
2244 state=pg['state']))
2245 trimming = True
2246 if not trimming:
2247 break
2248 self.log("{pool} still trimming, waiting".format(pool=pool))
2249 time.sleep(POLL_PERIOD)
2250
2251 def get_single_pg_stats(self, pgid):
2252 """
2253 Return pg for the pgid specified.
2254 """
2255 all_stats = self.get_pg_stats()
2256
2257 for pg in all_stats:
2258 if pg['pgid'] == pgid:
2259 return pg
2260
2261 return None
2262
2263 def get_object_pg_with_shard(self, pool, name, osdid):
2264 """
2265 """
2266 pool_dump = self.get_pool_dump(pool)
2267 object_map = self.get_object_map(pool, name)
2268 if pool_dump["type"] == PoolType.ERASURE_CODED:
2269 shard = object_map['acting'].index(osdid)
2270 return "{pgid}s{shard}".format(pgid=object_map['pgid'],
2271 shard=shard)
2272 else:
2273 return object_map['pgid']
2274
2275 def get_object_primary(self, pool, name):
2276 """
2277 """
2278 object_map = self.get_object_map(pool, name)
2279 return object_map['acting_primary']
2280
2281 def get_object_map(self, pool, name):
2282 """
2283 osd map --format=json converted to a python object
2284 :returns: the python object
2285 """
2286 out = self.raw_cluster_cmd('--format=json', 'osd', 'map', pool, name)
2287 return json.loads('\n'.join(out.split('\n')[1:]))
2288
2289 def get_osd_dump_json(self):
2290 """
2291 osd dump --format=json converted to a python object
2292 :returns: the python object
2293 """
2294 out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
2295 return json.loads('\n'.join(out.split('\n')[1:]))
2296
2297 def get_osd_dump(self):
2298 """
2299 Dump osds
2300 :returns: all osds
2301 """
2302 return self.get_osd_dump_json()['osds']
2303
2304 def get_osd_metadata(self):
2305 """
2306 osd metadata --format=json converted to a python object
2307 :returns: the python object containing osd metadata information
2308 """
2309 out = self.raw_cluster_cmd('osd', 'metadata', '--format=json')
2310 return json.loads('\n'.join(out.split('\n')[1:]))
2311
2312 def get_mgr_dump(self):
2313 out = self.raw_cluster_cmd('mgr', 'dump', '--format=json')
2314 return json.loads(out)
2315
2316 def get_stuck_pgs(self, type_, threshold):
2317 """
2318 :returns: stuck pg information from the cluster
2319 """
2320 out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
2321 '--format=json')
2322 return json.loads(out).get('stuck_pg_stats',[])
2323
2324 def get_num_unfound_objects(self):
2325 """
2326 Check cluster status to get the number of unfound objects
2327 """
2328 status = self.raw_cluster_status()
2329 self.log(status)
2330 return status['pgmap'].get('unfound_objects', 0)
2331
2332 def get_num_creating(self):
2333 """
2334 Find the number of pgs in creating mode.
2335 """
2336 pgs = self.get_pg_stats()
2337 num = 0
2338 for pg in pgs:
2339 if 'creating' in pg['state']:
2340 num += 1
2341 return num
2342
2343 def get_num_active_clean(self):
2344 """
2345 Find the number of active and clean pgs.
2346 """
2347 pgs = self.get_pg_stats()
2348 return self._get_num_active_clean(pgs)
2349
2350 def _get_num_active_clean(self, pgs):
2351 num = 0
2352 for pg in pgs:
2353 if (pg['state'].count('active') and
2354 pg['state'].count('clean') and
2355 not pg['state'].count('stale')):
2356 num += 1
2357 return num
2358
2359 def get_num_active_recovered(self):
2360 """
2361 Find the number of active and recovered pgs.
2362 """
2363 pgs = self.get_pg_stats()
2364 return self._get_num_active_recovered(pgs)
2365
2366 def _get_num_active_recovered(self, pgs):
2367 num = 0
2368 for pg in pgs:
2369 if (pg['state'].count('active') and
2370 not pg['state'].count('recover') and
2371 not pg['state'].count('backfilling') and
2372 not pg['state'].count('stale')):
2373 num += 1
2374 return num
2375
2376 def get_is_making_recovery_progress(self):
2377 """
2378 Return whether there is recovery progress discernable in the
2379 raw cluster status
2380 """
2381 status = self.raw_cluster_status()
2382 kps = status['pgmap'].get('recovering_keys_per_sec', 0)
2383 bps = status['pgmap'].get('recovering_bytes_per_sec', 0)
2384 ops = status['pgmap'].get('recovering_objects_per_sec', 0)
2385 return kps > 0 or bps > 0 or ops > 0
2386
2387 def get_num_active(self):
2388 """
2389 Find the number of active pgs.
2390 """
2391 pgs = self.get_pg_stats()
2392 return self._get_num_active(pgs)
2393
2394 def _get_num_active(self, pgs):
2395 num = 0
2396 for pg in pgs:
2397 if pg['state'].count('active') and not pg['state'].count('stale'):
2398 num += 1
2399 return num
2400
2401 def get_num_down(self):
2402 """
2403 Find the number of pgs that are down.
2404 """
2405 pgs = self.get_pg_stats()
2406 num = 0
2407 for pg in pgs:
2408 if ((pg['state'].count('down') and not
2409 pg['state'].count('stale')) or
2410 (pg['state'].count('incomplete') and not
2411 pg['state'].count('stale'))):
2412 num += 1
2413 return num
2414
2415 def get_num_active_down(self):
2416 """
2417 Find the number of pgs that are either active or down.
2418 """
2419 pgs = self.get_pg_stats()
2420 return self._get_num_active_down(pgs)
2421
2422 def _get_num_active_down(self, pgs):
2423 num = 0
2424 for pg in pgs:
2425 if ((pg['state'].count('active') and not
2426 pg['state'].count('stale')) or
2427 (pg['state'].count('down') and not
2428 pg['state'].count('stale')) or
2429 (pg['state'].count('incomplete') and not
2430 pg['state'].count('stale'))):
2431 num += 1
2432 return num
2433
2434 def get_num_peered(self):
2435 """
2436 Find the number of PGs that are peered
2437 """
2438 pgs = self.get_pg_stats()
2439 return self._get_num_peered(pgs)
2440
2441 def _get_num_peered(self, pgs):
2442 num = 0
2443 for pg in pgs:
2444 if pg['state'].count('peered') and not pg['state'].count('stale'):
2445 num += 1
2446 return num
2447
2448 def is_clean(self):
2449 """
2450 True if all pgs are clean
2451 """
2452 pgs = self.get_pg_stats()
2453 return self._get_num_active_clean(pgs) == len(pgs)
2454
2455 def is_recovered(self):
2456 """
2457 True if all pgs have recovered
2458 """
2459 pgs = self.get_pg_stats()
2460 return self._get_num_active_recovered(pgs) == len(pgs)
2461
2462 def is_active_or_down(self):
2463 """
2464 True if all pgs are active or down
2465 """
2466 pgs = self.get_pg_stats()
2467 return self._get_num_active_down(pgs) == len(pgs)
2468
2469 def wait_for_clean(self, timeout=1200):
2470 """
2471 Returns true when all pgs are clean.
2472 """
2473 self.log("waiting for clean")
2474 start = time.time()
2475 num_active_clean = self.get_num_active_clean()
2476 while not self.is_clean():
2477 if timeout is not None:
2478 if self.get_is_making_recovery_progress():
2479 self.log("making progress, resetting timeout")
2480 start = time.time()
2481 else:
2482 self.log("no progress seen, keeping timeout for now")
2483 if time.time() - start >= timeout:
2484 self.log('dumping pgs')
2485 out = self.raw_cluster_cmd('pg', 'dump')
2486 self.log(out)
2487 assert time.time() - start < timeout, \
2488 'failed to become clean before timeout expired'
2489 cur_active_clean = self.get_num_active_clean()
2490 if cur_active_clean != num_active_clean:
2491 start = time.time()
2492 num_active_clean = cur_active_clean
2493 time.sleep(3)
2494 self.log("clean!")
2495
2496 def are_all_osds_up(self):
2497 """
2498 Returns true if all osds are up.
2499 """
2500 x = self.get_osd_dump()
2501 return (len(x) == sum([(y['up'] > 0) for y in x]))
2502
2503 def wait_for_all_osds_up(self, timeout=None):
2504 """
2505 When this exits, either the timeout has expired, or all
2506 osds are up.
2507 """
2508 self.log("waiting for all up")
2509 start = time.time()
2510 while not self.are_all_osds_up():
2511 if timeout is not None:
2512 assert time.time() - start < timeout, \
2513 'timeout expired in wait_for_all_osds_up'
2514 time.sleep(3)
2515 self.log("all up!")
2516
2517 def pool_exists(self, pool):
2518 if pool in self.list_pools():
2519 return True
2520 return False
2521
2522 def wait_for_pool(self, pool, timeout=300):
2523 """
2524 Wait for a pool to exist
2525 """
2526 self.log('waiting for pool %s to exist' % pool)
2527 start = time.time()
2528 while not self.pool_exists(pool):
2529 if timeout is not None:
2530 assert time.time() - start < timeout, \
2531 'timeout expired in wait_for_pool'
2532 time.sleep(3)
2533
2534 def wait_for_pools(self, pools):
2535 for pool in pools:
2536 self.wait_for_pool(pool)
2537
2538 def is_mgr_available(self):
2539 x = self.get_mgr_dump()
2540 return x.get('available', False)
2541
2542 def wait_for_mgr_available(self, timeout=None):
2543 self.log("waiting for mgr available")
2544 start = time.time()
2545 while not self.is_mgr_available():
2546 if timeout is not None:
2547 assert time.time() - start < timeout, \
2548 'timeout expired in wait_for_mgr_available'
2549 time.sleep(3)
2550 self.log("mgr available!")
2551
2552 def wait_for_recovery(self, timeout=None):
2553 """
2554 Check peering. When this exists, we have recovered.
2555 """
2556 self.log("waiting for recovery to complete")
2557 start = time.time()
2558 num_active_recovered = self.get_num_active_recovered()
2559 while not self.is_recovered():
2560 now = time.time()
2561 if timeout is not None:
2562 if self.get_is_making_recovery_progress():
2563 self.log("making progress, resetting timeout")
2564 start = time.time()
2565 else:
2566 self.log("no progress seen, keeping timeout for now")
2567 if now - start >= timeout:
2568 if self.is_recovered():
2569 break
2570 self.log('dumping pgs')
2571 out = self.raw_cluster_cmd('pg', 'dump')
2572 self.log(out)
2573 assert now - start < timeout, \
2574 'failed to recover before timeout expired'
2575 cur_active_recovered = self.get_num_active_recovered()
2576 if cur_active_recovered != num_active_recovered:
2577 start = time.time()
2578 num_active_recovered = cur_active_recovered
2579 time.sleep(3)
2580 self.log("recovered!")
2581
2582 def wait_for_active(self, timeout=None):
2583 """
2584 Check peering. When this exists, we are definitely active
2585 """
2586 self.log("waiting for peering to complete")
2587 start = time.time()
2588 num_active = self.get_num_active()
2589 while not self.is_active():
2590 if timeout is not None:
2591 if time.time() - start >= timeout:
2592 self.log('dumping pgs')
2593 out = self.raw_cluster_cmd('pg', 'dump')
2594 self.log(out)
2595 assert time.time() - start < timeout, \
2596 'failed to recover before timeout expired'
2597 cur_active = self.get_num_active()
2598 if cur_active != num_active:
2599 start = time.time()
2600 num_active = cur_active
2601 time.sleep(3)
2602 self.log("active!")
2603
2604 def wait_for_active_or_down(self, timeout=None):
2605 """
2606 Check peering. When this exists, we are definitely either
2607 active or down
2608 """
2609 self.log("waiting for peering to complete or become blocked")
2610 start = time.time()
2611 num_active_down = self.get_num_active_down()
2612 while not self.is_active_or_down():
2613 if timeout is not None:
2614 if time.time() - start >= timeout:
2615 self.log('dumping pgs')
2616 out = self.raw_cluster_cmd('pg', 'dump')
2617 self.log(out)
2618 assert time.time() - start < timeout, \
2619 'failed to recover before timeout expired'
2620 cur_active_down = self.get_num_active_down()
2621 if cur_active_down != num_active_down:
2622 start = time.time()
2623 num_active_down = cur_active_down
2624 time.sleep(3)
2625 self.log("active or down!")
2626
2627 def osd_is_up(self, osd):
2628 """
2629 Wrapper for osd check
2630 """
2631 osds = self.get_osd_dump()
2632 return osds[osd]['up'] > 0
2633
2634 def wait_till_osd_is_up(self, osd, timeout=None):
2635 """
2636 Loop waiting for osd.
2637 """
2638 self.log('waiting for osd.%d to be up' % osd)
2639 start = time.time()
2640 while not self.osd_is_up(osd):
2641 if timeout is not None:
2642 assert time.time() - start < timeout, \
2643 'osd.%d failed to come up before timeout expired' % osd
2644 time.sleep(3)
2645 self.log('osd.%d is up' % osd)
2646
2647 def is_active(self):
2648 """
2649 Wrapper to check if all pgs are active
2650 """
2651 return self.get_num_active() == self.get_num_pgs()
2652
2653 def all_active_or_peered(self):
2654 """
2655 Wrapper to check if all PGs are active or peered
2656 """
2657 pgs = self.get_pg_stats()
2658 return self._get_num_active(pgs) + self._get_num_peered(pgs) == len(pgs)
2659
2660 def wait_till_active(self, timeout=None):
2661 """
2662 Wait until all pgs are active.
2663 """
2664 self.log("waiting till active")
2665 start = time.time()
2666 while not self.is_active():
2667 if timeout is not None:
2668 if time.time() - start >= timeout:
2669 self.log('dumping pgs')
2670 out = self.raw_cluster_cmd('pg', 'dump')
2671 self.log(out)
2672 assert time.time() - start < timeout, \
2673 'failed to become active before timeout expired'
2674 time.sleep(3)
2675 self.log("active!")
2676
2677 def wait_till_pg_convergence(self, timeout=None):
2678 start = time.time()
2679 old_stats = None
2680 active_osds = [osd['osd'] for osd in self.get_osd_dump()
2681 if osd['in'] and osd['up']]
2682 while True:
2683 # strictly speaking, no need to wait for mon. but due to the
2684 # "ms inject socket failures" setting, the osdmap could be delayed,
2685 # so mgr is likely to ignore the pg-stat messages with pgs serving
2686 # newly created pools which is not yet known by mgr. so, to make sure
2687 # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
2688 # necessary.
2689 self.flush_pg_stats(active_osds)
2690 new_stats = dict((stat['pgid'], stat['state'])
2691 for stat in self.get_pg_stats())
2692 if old_stats == new_stats:
2693 return old_stats
2694 if timeout is not None:
2695 assert time.time() - start < timeout, \
2696 'failed to reach convergence before %d secs' % timeout
2697 old_stats = new_stats
2698 # longer than mgr_stats_period
2699 time.sleep(5 + 1)
2700
2701 def mark_out_osd(self, osd):
2702 """
2703 Wrapper to mark osd out.
2704 """
2705 self.raw_cluster_cmd('osd', 'out', str(osd))
2706
2707 def kill_osd(self, osd):
2708 """
2709 Kill osds by either power cycling (if indicated by the config)
2710 or by stopping.
2711 """
2712 if self.config.get('powercycle'):
2713 remote = self.find_remote('osd', osd)
2714 self.log('kill_osd on osd.{o} '
2715 'doing powercycle of {s}'.format(o=osd, s=remote.name))
2716 self._assert_ipmi(remote)
2717 remote.console.power_off()
2718 elif self.config.get('bdev_inject_crash') and self.config.get('bdev_inject_crash_probability'):
2719 if random.uniform(0, 1) < self.config.get('bdev_inject_crash_probability', .5):
2720 self.inject_args(
2721 'osd', osd,
2722 'bdev-inject-crash', self.config.get('bdev_inject_crash'))
2723 try:
2724 self.ctx.daemons.get_daemon('osd', osd, self.cluster).wait()
2725 except:
2726 pass
2727 else:
2728 raise RuntimeError('osd.%s did not fail' % osd)
2729 else:
2730 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2731 else:
2732 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2733
2734 @staticmethod
2735 def _assert_ipmi(remote):
2736 assert remote.console.has_ipmi_credentials, (
2737 "powercycling requested but RemoteConsole is not "
2738 "initialized. Check ipmi config.")
2739
2740 def blackhole_kill_osd(self, osd):
2741 """
2742 Stop osd if nothing else works.
2743 """
2744 self.inject_args('osd', osd,
2745 'objectstore-blackhole', True)
2746 time.sleep(2)
2747 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2748
2749 def revive_osd(self, osd, timeout=360, skip_admin_check=False):
2750 """
2751 Revive osds by either power cycling (if indicated by the config)
2752 or by restarting.
2753 """
2754 if self.config.get('powercycle'):
2755 remote = self.find_remote('osd', osd)
2756 self.log('kill_osd on osd.{o} doing powercycle of {s}'.
2757 format(o=osd, s=remote.name))
2758 self._assert_ipmi(remote)
2759 remote.console.power_on()
2760 if not remote.console.check_status(300):
2761 raise Exception('Failed to revive osd.{o} via ipmi'.
2762 format(o=osd))
2763 teuthology.reconnect(self.ctx, 60, [remote])
2764 mount_osd_data(self.ctx, remote, self.cluster, str(osd))
2765 self.make_admin_daemon_dir(remote)
2766 self.ctx.daemons.get_daemon('osd', osd, self.cluster).reset()
2767 self.ctx.daemons.get_daemon('osd', osd, self.cluster).restart()
2768
2769 if not skip_admin_check:
2770 # wait for dump_ops_in_flight; this command doesn't appear
2771 # until after the signal handler is installed and it is safe
2772 # to stop the osd again without making valgrind leak checks
2773 # unhappy. see #5924.
2774 self.wait_run_admin_socket('osd', osd,
2775 args=['dump_ops_in_flight'],
2776 timeout=timeout, stdout=DEVNULL)
2777
2778 def mark_down_osd(self, osd):
2779 """
2780 Cluster command wrapper
2781 """
2782 self.raw_cluster_cmd('osd', 'down', str(osd))
2783
2784 def mark_in_osd(self, osd):
2785 """
2786 Cluster command wrapper
2787 """
2788 self.raw_cluster_cmd('osd', 'in', str(osd))
2789
2790 def signal_osd(self, osd, sig, silent=False):
2791 """
2792 Wrapper to local get_daemon call which sends the given
2793 signal to the given osd.
2794 """
2795 self.ctx.daemons.get_daemon('osd', osd,
2796 self.cluster).signal(sig, silent=silent)
2797
2798 ## monitors
2799 def signal_mon(self, mon, sig, silent=False):
2800 """
2801 Wrapper to local get_daemon call
2802 """
2803 self.ctx.daemons.get_daemon('mon', mon,
2804 self.cluster).signal(sig, silent=silent)
2805
2806 def kill_mon(self, mon):
2807 """
2808 Kill the monitor by either power cycling (if the config says so),
2809 or by doing a stop.
2810 """
2811 if self.config.get('powercycle'):
2812 remote = self.find_remote('mon', mon)
2813 self.log('kill_mon on mon.{m} doing powercycle of {s}'.
2814 format(m=mon, s=remote.name))
2815 self._assert_ipmi(remote)
2816 remote.console.power_off()
2817 else:
2818 self.ctx.daemons.get_daemon('mon', mon, self.cluster).stop()
2819
2820 def revive_mon(self, mon):
2821 """
2822 Restart by either power cycling (if the config says so),
2823 or by doing a normal restart.
2824 """
2825 if self.config.get('powercycle'):
2826 remote = self.find_remote('mon', mon)
2827 self.log('revive_mon on mon.{m} doing powercycle of {s}'.
2828 format(m=mon, s=remote.name))
2829 self._assert_ipmi(remote)
2830 remote.console.power_on()
2831 self.make_admin_daemon_dir(remote)
2832 self.ctx.daemons.get_daemon('mon', mon, self.cluster).restart()
2833
2834 def revive_mgr(self, mgr):
2835 """
2836 Restart by either power cycling (if the config says so),
2837 or by doing a normal restart.
2838 """
2839 if self.config.get('powercycle'):
2840 remote = self.find_remote('mgr', mgr)
2841 self.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
2842 format(m=mgr, s=remote.name))
2843 self._assert_ipmi(remote)
2844 remote.console.power_on()
2845 self.make_admin_daemon_dir(remote)
2846 self.ctx.daemons.get_daemon('mgr', mgr, self.cluster).restart()
2847
2848 def get_mon_status(self, mon):
2849 """
2850 Extract all the monitor status information from the cluster
2851 """
2852 out = self.raw_cluster_cmd('tell', 'mon.%s' % mon, 'mon_status')
2853 return json.loads(out)
2854
2855 def get_mon_quorum(self):
2856 """
2857 Extract monitor quorum information from the cluster
2858 """
2859 out = self.raw_cluster_cmd('quorum_status')
2860 j = json.loads(out)
2861 self.log('quorum_status is %s' % out)
2862 return j['quorum']
2863
2864 def wait_for_mon_quorum_size(self, size, timeout=300):
2865 """
2866 Loop until quorum size is reached.
2867 """
2868 self.log('waiting for quorum size %d' % size)
2869 start = time.time()
2870 while not len(self.get_mon_quorum()) == size:
2871 if timeout is not None:
2872 assert time.time() - start < timeout, \
2873 ('failed to reach quorum size %d '
2874 'before timeout expired' % size)
2875 time.sleep(3)
2876 self.log("quorum is size %d" % size)
2877
2878 def get_mon_health(self, debug=False):
2879 """
2880 Extract all the monitor health information.
2881 """
2882 out = self.raw_cluster_cmd('health', '--format=json')
2883 if debug:
2884 self.log('health:\n{h}'.format(h=out))
2885 return json.loads(out)
2886
2887 def wait_until_healthy(self, timeout=None):
2888 self.log("wait_until_healthy")
2889 start = time.time()
2890 while self.get_mon_health()['status'] != 'HEALTH_OK':
2891 if timeout is not None:
2892 assert time.time() - start < timeout, \
2893 'timeout expired in wait_until_healthy'
2894 time.sleep(3)
2895 self.log("wait_until_healthy done")
2896
2897 def get_filepath(self):
2898 """
2899 Return path to osd data with {id} needing to be replaced
2900 """
2901 return '/var/lib/ceph/osd/' + self.cluster + '-{id}'
2902
2903 def make_admin_daemon_dir(self, remote):
2904 """
2905 Create /var/run/ceph directory on remote site.
2906
2907 :param ctx: Context
2908 :param remote: Remote site
2909 """
2910 remote.run(args=['sudo',
2911 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
2912
2913 def get_service_task_status(self, service, status_key):
2914 """
2915 Return daemon task status for a given ceph service.
2916
2917 :param service: ceph service (mds, osd, etc...)
2918 :param status_key: matching task status key
2919 """
2920 task_status = {}
2921 status = self.raw_cluster_status()
2922 try:
2923 for k,v in status['servicemap']['services'][service]['daemons'].items():
2924 ts = dict(v).get('task_status', None)
2925 if ts:
2926 task_status[k] = ts[status_key]
2927 except KeyError: # catches missing service and status key
2928 return {}
2929 self.log(task_status)
2930 return task_status
2931
2932 def utility_task(name):
2933 """
2934 Generate ceph_manager subtask corresponding to ceph_manager
2935 method name
2936 """
2937 def task(ctx, config):
2938 if config is None:
2939 config = {}
2940 args = config.get('args', [])
2941 kwargs = config.get('kwargs', {})
2942 cluster = config.get('cluster', 'ceph')
2943 fn = getattr(ctx.managers[cluster], name)
2944 fn(*args, **kwargs)
2945 return task
2946
2947 revive_osd = utility_task("revive_osd")
2948 revive_mon = utility_task("revive_mon")
2949 kill_osd = utility_task("kill_osd")
2950 kill_mon = utility_task("kill_mon")
2951 create_pool = utility_task("create_pool")
2952 remove_pool = utility_task("remove_pool")
2953 wait_for_clean = utility_task("wait_for_clean")
2954 flush_all_pg_stats = utility_task("flush_all_pg_stats")
2955 set_pool_property = utility_task("set_pool_property")
2956 do_pg_scrub = utility_task("do_pg_scrub")
2957 wait_for_pool = utility_task("wait_for_pool")
2958 wait_for_pools = utility_task("wait_for_pools")