]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/ceph_manager.py
Import ceph 15.2.8
[ceph.git] / ceph / qa / tasks / ceph_manager.py
1 """
2 ceph manager -- Thrasher and CephManager objects
3 """
4 from functools import wraps
5 import contextlib
6 import random
7 import signal
8 import time
9 import gevent
10 import base64
11 import json
12 import logging
13 import threading
14 import traceback
15 import os
16 import six
17
18 from io import BytesIO, StringIO
19 from teuthology import misc as teuthology
20 from tasks.scrub import Scrubber
21 from tasks.util.rados import cmd_erasure_code_profile
22 from tasks.util import get_remote
23 from teuthology.contextutil import safe_while
24 from teuthology.orchestra.remote import Remote
25 from teuthology.orchestra import run
26 from teuthology.exceptions import CommandFailedError
27 from tasks.thrasher import Thrasher
28 from six import StringIO
29
30 try:
31 from subprocess import DEVNULL # py3k
32 except ImportError:
33 DEVNULL = open(os.devnull, 'r+') # type: ignore
34
35 DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
36
37 log = logging.getLogger(__name__)
38
39 # this is for cephadm clusters
40 def shell(ctx, cluster_name, remote, args, name=None, **kwargs):
41 testdir = teuthology.get_testdir(ctx)
42 extra_args = []
43 if name:
44 extra_args = ['-n', name]
45 return remote.run(
46 args=[
47 'sudo',
48 ctx.cephadm,
49 '--image', ctx.ceph[cluster_name].image,
50 'shell',
51 ] + extra_args + [
52 '--fsid', ctx.ceph[cluster_name].fsid,
53 '--',
54 ] + args,
55 **kwargs
56 )
57
58 def write_conf(ctx, conf_path=DEFAULT_CONF_PATH, cluster='ceph'):
59 conf_fp = BytesIO()
60 ctx.ceph[cluster].conf.write(conf_fp)
61 conf_fp.seek(0)
62 writes = ctx.cluster.run(
63 args=[
64 'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
65 'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
66 'sudo', 'tee', conf_path, run.Raw('&&'),
67 'sudo', 'chmod', '0644', conf_path,
68 run.Raw('>'), '/dev/null',
69
70 ],
71 stdin=run.PIPE,
72 wait=False)
73 teuthology.feed_many_stdins_and_close(conf_fp, writes)
74 run.wait(writes)
75
76
77 def mount_osd_data(ctx, remote, cluster, osd):
78 """
79 Mount a remote OSD
80
81 :param ctx: Context
82 :param remote: Remote site
83 :param cluster: name of ceph cluster
84 :param osd: Osd name
85 """
86 log.debug('Mounting data for osd.{o} on {r}'.format(o=osd, r=remote))
87 role = "{0}.osd.{1}".format(cluster, osd)
88 alt_role = role if cluster != 'ceph' else "osd.{0}".format(osd)
89 if remote in ctx.disk_config.remote_to_roles_to_dev:
90 if alt_role in ctx.disk_config.remote_to_roles_to_dev[remote]:
91 role = alt_role
92 if role not in ctx.disk_config.remote_to_roles_to_dev[remote]:
93 return
94 dev = ctx.disk_config.remote_to_roles_to_dev[remote][role]
95 mount_options = ctx.disk_config.\
96 remote_to_roles_to_dev_mount_options[remote][role]
97 fstype = ctx.disk_config.remote_to_roles_to_dev_fstype[remote][role]
98 mnt = os.path.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster, osd))
99
100 log.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
101 'mountpoint: {p}, type: {t}, options: {v}'.format(
102 o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options,
103 c=cluster))
104
105 remote.run(
106 args=[
107 'sudo',
108 'mount',
109 '-t', fstype,
110 '-o', ','.join(mount_options),
111 dev,
112 mnt,
113 ]
114 )
115
116
117 def log_exc(func):
118 @wraps(func)
119 def wrapper(self):
120 try:
121 return func(self)
122 except:
123 self.log(traceback.format_exc())
124 raise
125 return wrapper
126
127
128 class PoolType:
129 REPLICATED = 1
130 ERASURE_CODED = 3
131
132
133 class OSDThrasher(Thrasher):
134 """
135 Object used to thrash Ceph
136 """
137 def __init__(self, manager, config, name, logger):
138 super(OSDThrasher, self).__init__()
139
140 self.ceph_manager = manager
141 self.cluster = manager.cluster
142 self.ceph_manager.wait_for_clean()
143 osd_status = self.ceph_manager.get_osd_status()
144 self.in_osds = osd_status['in']
145 self.live_osds = osd_status['live']
146 self.out_osds = osd_status['out']
147 self.dead_osds = osd_status['dead']
148 self.stopping = False
149 self.logger = logger
150 self.config = config
151 self.name = name
152 self.revive_timeout = self.config.get("revive_timeout", 360)
153 self.pools_to_fix_pgp_num = set()
154 if self.config.get('powercycle'):
155 self.revive_timeout += 120
156 self.clean_wait = self.config.get('clean_wait', 0)
157 self.minin = self.config.get("min_in", 4)
158 self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
159 self.sighup_delay = self.config.get('sighup_delay')
160 self.optrack_toggle_delay = self.config.get('optrack_toggle_delay')
161 self.dump_ops_enable = self.config.get('dump_ops_enable')
162 self.noscrub_toggle_delay = self.config.get('noscrub_toggle_delay')
163 self.chance_thrash_cluster_full = self.config.get('chance_thrash_cluster_full', .05)
164 self.chance_thrash_pg_upmap = self.config.get('chance_thrash_pg_upmap', 1.0)
165 self.chance_thrash_pg_upmap_items = self.config.get('chance_thrash_pg_upmap', 1.0)
166 self.random_eio = self.config.get('random_eio')
167 self.chance_force_recovery = self.config.get('chance_force_recovery', 0.3)
168
169 num_osds = self.in_osds + self.out_osds
170 self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * len(num_osds)
171 self.min_pgs = self.config.get("min_pgs_per_pool_osd", 1) * len(num_osds)
172 if self.config is None:
173 self.config = dict()
174 # prevent monitor from auto-marking things out while thrasher runs
175 # try both old and new tell syntax, in case we are testing old code
176 self.saved_options = []
177 # assuming that the default settings do not vary from one daemon to
178 # another
179 first_mon = teuthology.get_first_mon(manager.ctx, self.config).split('.')
180 opts = [('mon', 'mon_osd_down_out_interval', 0)]
181 #why do we disable marking an OSD out automatically? :/
182 for service, opt, new_value in opts:
183 old_value = manager.get_config(first_mon[0],
184 first_mon[1],
185 opt)
186 self.saved_options.append((service, opt, old_value))
187 manager.inject_args(service, '*', opt, new_value)
188 # initialize ceph_objectstore_tool property - must be done before
189 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
190 if (self.config.get('powercycle') or
191 not self.cmd_exists_on_osds("ceph-objectstore-tool") or
192 self.config.get('disable_objectstore_tool_tests', False)):
193 self.ceph_objectstore_tool = False
194 if self.config.get('powercycle'):
195 self.log("Unable to test ceph-objectstore-tool, "
196 "powercycle testing")
197 else:
198 self.log("Unable to test ceph-objectstore-tool, "
199 "not available on all OSD nodes")
200 else:
201 self.ceph_objectstore_tool = \
202 self.config.get('ceph_objectstore_tool', True)
203 # spawn do_thrash
204 self.thread = gevent.spawn(self.do_thrash)
205 if self.sighup_delay:
206 self.sighup_thread = gevent.spawn(self.do_sighup)
207 if self.optrack_toggle_delay:
208 self.optrack_toggle_thread = gevent.spawn(self.do_optrack_toggle)
209 if self.dump_ops_enable == "true":
210 self.dump_ops_thread = gevent.spawn(self.do_dump_ops)
211 if self.noscrub_toggle_delay:
212 self.noscrub_toggle_thread = gevent.spawn(self.do_noscrub_toggle)
213
214 def log(self, msg, *args, **kwargs):
215 self.logger.info(msg, *args, **kwargs)
216
217 def cmd_exists_on_osds(self, cmd):
218 if self.ceph_manager.cephadm:
219 return True
220 allremotes = self.ceph_manager.ctx.cluster.only(\
221 teuthology.is_type('osd', self.cluster)).remotes.keys()
222 allremotes = list(set(allremotes))
223 for remote in allremotes:
224 proc = remote.run(args=['type', cmd], wait=True,
225 check_status=False, stdout=BytesIO(),
226 stderr=BytesIO())
227 if proc.exitstatus != 0:
228 return False;
229 return True;
230
231 def run_ceph_objectstore_tool(self, remote, osd, cmd):
232 if self.ceph_manager.cephadm:
233 return shell(
234 self.ceph_manager.ctx, self.ceph_manager.cluster, remote,
235 args=['ceph-objectstore-tool'] + cmd,
236 name=osd,
237 wait=True, check_status=False,
238 stdout=StringIO(),
239 stderr=StringIO())
240 else:
241 return remote.run(
242 args=['sudo', 'adjust-ulimits', 'ceph-objectstore-tool'] + cmd,
243 wait=True, check_status=False,
244 stdout=StringIO(),
245 stderr=StringIO())
246
247 def kill_osd(self, osd=None, mark_down=False, mark_out=False):
248 """
249 :param osd: Osd to be killed.
250 :mark_down: Mark down if true.
251 :mark_out: Mark out if true.
252 """
253 if osd is None:
254 osd = random.choice(self.live_osds)
255 self.log("Killing osd %s, live_osds are %s" % (str(osd),
256 str(self.live_osds)))
257 self.live_osds.remove(osd)
258 self.dead_osds.append(osd)
259 self.ceph_manager.kill_osd(osd)
260 if mark_down:
261 self.ceph_manager.mark_down_osd(osd)
262 if mark_out and osd in self.in_osds:
263 self.out_osd(osd)
264 if self.ceph_objectstore_tool:
265 self.log("Testing ceph-objectstore-tool on down osd.%s" % osd)
266 remote = self.ceph_manager.find_remote('osd', osd)
267 FSPATH = self.ceph_manager.get_filepath()
268 JPATH = os.path.join(FSPATH, "journal")
269 exp_osd = imp_osd = osd
270 self.log('remote for osd %s is %s' % (osd, remote))
271 exp_remote = imp_remote = remote
272 # If an older osd is available we'll move a pg from there
273 if (len(self.dead_osds) > 1 and
274 random.random() < self.chance_move_pg):
275 exp_osd = random.choice(self.dead_osds[:-1])
276 exp_remote = self.ceph_manager.find_remote('osd', exp_osd)
277 self.log('remote for exp osd %s is %s' % (exp_osd, exp_remote))
278 prefix = [
279 '--no-mon-config',
280 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
281 ]
282
283 if not self.ceph_manager.cephadm:
284 # ceph-objectstore-tool might be temporarily absent during an
285 # upgrade - see http://tracker.ceph.com/issues/18014
286 with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
287 while proceed():
288 proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'],
289 wait=True, check_status=False, stdout=BytesIO(),
290 stderr=BytesIO())
291 if proc.exitstatus == 0:
292 break
293 log.debug("ceph-objectstore-tool binary not present, trying again")
294
295 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
296 # see http://tracker.ceph.com/issues/19556
297 with safe_while(sleep=15, tries=40, action="ceph-objectstore-tool --op list-pgs") as proceed:
298 while proceed():
299 proc = self.run_ceph_objectstore_tool(
300 exp_remote, 'osd.%s' % exp_osd,
301 prefix + [
302 '--data-path', FSPATH.format(id=exp_osd),
303 '--journal-path', JPATH.format(id=exp_osd),
304 '--op', 'list-pgs',
305 ])
306 if proc.exitstatus == 0:
307 break
308 elif (proc.exitstatus == 1 and
309 proc.stderr.getvalue() == "OSD has the store locked"):
310 continue
311 else:
312 raise Exception("ceph-objectstore-tool: "
313 "exp list-pgs failure with status {ret}".
314 format(ret=proc.exitstatus))
315
316 pgs = proc.stdout.getvalue().split('\n')[:-1]
317 if len(pgs) == 0:
318 self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
319 return
320 pg = random.choice(pgs)
321 #exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
322 #exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
323 exp_path = os.path.join('/var/log/ceph', # available inside 'shell' container
324 "exp.{pg}.{id}".format(
325 pg=pg,
326 id=exp_osd))
327 if self.ceph_manager.cephadm:
328 exp_host_path = os.path.join(
329 '/var/log/ceph',
330 self.ceph_manager.ctx.ceph[self.ceph_manager.cluster].fsid,
331 "exp.{pg}.{id}".format(
332 pg=pg,
333 id=exp_osd))
334 else:
335 exp_host_path = exp_path
336
337 # export
338 # Can't use new export-remove op since this is part of upgrade testing
339 proc = self.run_ceph_objectstore_tool(
340 exp_remote, 'osd.%s' % exp_osd,
341 prefix + [
342 '--data-path', FSPATH.format(id=exp_osd),
343 '--journal-path', JPATH.format(id=exp_osd),
344 '--op', 'export',
345 '--pgid', pg,
346 '--file', exp_path,
347 ])
348 if proc.exitstatus:
349 raise Exception("ceph-objectstore-tool: "
350 "export failure with status {ret}".
351 format(ret=proc.exitstatus))
352 # remove
353 proc = self.run_ceph_objectstore_tool(
354 exp_remote, 'osd.%s' % exp_osd,
355 prefix + [
356 '--data-path', FSPATH.format(id=exp_osd),
357 '--journal-path', JPATH.format(id=exp_osd),
358 '--force',
359 '--op', 'remove',
360 '--pgid', pg,
361 ])
362 if proc.exitstatus:
363 raise Exception("ceph-objectstore-tool: "
364 "remove failure with status {ret}".
365 format(ret=proc.exitstatus))
366 # If there are at least 2 dead osds we might move the pg
367 if exp_osd != imp_osd:
368 # If pg isn't already on this osd, then we will move it there
369 proc = self.run_ceph_objectstore_tool(
370 imp_remote,
371 'osd.%s' % imp_osd,
372 prefix + [
373 '--data-path', FSPATH.format(id=imp_osd),
374 '--journal-path', JPATH.format(id=imp_osd),
375 '--op', 'list-pgs',
376 ])
377 if proc.exitstatus:
378 raise Exception("ceph-objectstore-tool: "
379 "imp list-pgs failure with status {ret}".
380 format(ret=proc.exitstatus))
381 pgs = proc.stdout.getvalue().split('\n')[:-1]
382 if pg not in pgs:
383 self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
384 format(pg=pg, fosd=exp_osd, tosd=imp_osd))
385 if imp_remote != exp_remote:
386 # Copy export file to the other machine
387 self.log("Transfer export file from {srem} to {trem}".
388 format(srem=exp_remote, trem=imp_remote))
389 # just in case an upgrade make /var/log/ceph unreadable by non-root,
390 exp_remote.run(args=['sudo', 'chmod', '777',
391 '/var/log/ceph'])
392 imp_remote.run(args=['sudo', 'chmod', '777',
393 '/var/log/ceph'])
394 tmpexport = Remote.get_file(exp_remote, exp_host_path,
395 sudo=True)
396 if exp_host_path != exp_path:
397 # push to /var/log/ceph, then rename (we can't
398 # chmod 777 the /var/log/ceph/$fsid mountpoint)
399 Remote.put_file(imp_remote, tmpexport, exp_path)
400 imp_remote.run(args=[
401 'sudo', 'mv', exp_path, exp_host_path])
402 else:
403 Remote.put_file(imp_remote, tmpexport, exp_host_path)
404 os.remove(tmpexport)
405 else:
406 # Can't move the pg after all
407 imp_osd = exp_osd
408 imp_remote = exp_remote
409 # import
410 proc = self.run_ceph_objectstore_tool(
411 imp_remote, 'osd.%s' % imp_osd,
412 [
413 '--data-path', FSPATH.format(id=imp_osd),
414 '--journal-path', JPATH.format(id=imp_osd),
415 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
416 '--op', 'import',
417 '--file', exp_path,
418 ])
419 if proc.exitstatus == 1:
420 bogosity = "The OSD you are using is older than the exported PG"
421 if bogosity in proc.stderr.getvalue():
422 self.log("OSD older than exported PG"
423 "...ignored")
424 elif proc.exitstatus == 10:
425 self.log("Pool went away before processing an import"
426 "...ignored")
427 elif proc.exitstatus == 11:
428 self.log("Attempt to import an incompatible export"
429 "...ignored")
430 elif proc.exitstatus == 12:
431 # this should be safe to ignore because we only ever move 1
432 # copy of the pg at a time, and merge is only initiated when
433 # all replicas are peered and happy. /me crosses fingers
434 self.log("PG merged on target"
435 "...ignored")
436 elif proc.exitstatus:
437 raise Exception("ceph-objectstore-tool: "
438 "import failure with status {ret}".
439 format(ret=proc.exitstatus))
440 cmd = "sudo rm -f {file}".format(file=exp_host_path)
441 exp_remote.run(args=cmd)
442 if imp_remote != exp_remote:
443 imp_remote.run(args=cmd)
444
445 # apply low split settings to each pool
446 if not self.ceph_manager.cephadm:
447 for pool in self.ceph_manager.list_pools():
448 cmd = ("CEPH_ARGS='--filestore-merge-threshold 1 "
449 "--filestore-split-multiple 1' sudo -E "
450 + 'ceph-objectstore-tool '
451 + ' '.join(prefix + [
452 '--data-path', FSPATH.format(id=imp_osd),
453 '--journal-path', JPATH.format(id=imp_osd),
454 ])
455 + " --op apply-layout-settings --pool " + pool).format(id=osd)
456 proc = imp_remote.run(args=cmd,
457 wait=True, check_status=False,
458 stderr=StringIO())
459 if 'Couldn\'t find pool' in proc.stderr.getvalue():
460 continue
461 if proc.exitstatus:
462 raise Exception("ceph-objectstore-tool apply-layout-settings"
463 " failed with {status}".format(status=proc.exitstatus))
464
465
466 def blackhole_kill_osd(self, osd=None):
467 """
468 If all else fails, kill the osd.
469 :param osd: Osd to be killed.
470 """
471 if osd is None:
472 osd = random.choice(self.live_osds)
473 self.log("Blackholing and then killing osd %s, live_osds are %s" %
474 (str(osd), str(self.live_osds)))
475 self.live_osds.remove(osd)
476 self.dead_osds.append(osd)
477 self.ceph_manager.blackhole_kill_osd(osd)
478
479 def revive_osd(self, osd=None, skip_admin_check=False):
480 """
481 Revive the osd.
482 :param osd: Osd to be revived.
483 """
484 if osd is None:
485 osd = random.choice(self.dead_osds)
486 self.log("Reviving osd %s" % (str(osd),))
487 self.ceph_manager.revive_osd(
488 osd,
489 self.revive_timeout,
490 skip_admin_check=skip_admin_check)
491 self.dead_osds.remove(osd)
492 self.live_osds.append(osd)
493 if self.random_eio > 0 and osd == self.rerrosd:
494 self.ceph_manager.set_config(self.rerrosd,
495 filestore_debug_random_read_err = self.random_eio)
496 self.ceph_manager.set_config(self.rerrosd,
497 bluestore_debug_random_read_err = self.random_eio)
498
499
500 def out_osd(self, osd=None):
501 """
502 Mark the osd out
503 :param osd: Osd to be marked.
504 """
505 if osd is None:
506 osd = random.choice(self.in_osds)
507 self.log("Removing osd %s, in_osds are: %s" %
508 (str(osd), str(self.in_osds)))
509 self.ceph_manager.mark_out_osd(osd)
510 self.in_osds.remove(osd)
511 self.out_osds.append(osd)
512
513 def in_osd(self, osd=None):
514 """
515 Mark the osd out
516 :param osd: Osd to be marked.
517 """
518 if osd is None:
519 osd = random.choice(self.out_osds)
520 if osd in self.dead_osds:
521 return self.revive_osd(osd)
522 self.log("Adding osd %s" % (str(osd),))
523 self.out_osds.remove(osd)
524 self.in_osds.append(osd)
525 self.ceph_manager.mark_in_osd(osd)
526 self.log("Added osd %s" % (str(osd),))
527
528 def reweight_osd_or_by_util(self, osd=None):
529 """
530 Reweight an osd that is in
531 :param osd: Osd to be marked.
532 """
533 if osd is not None or random.choice([True, False]):
534 if osd is None:
535 osd = random.choice(self.in_osds)
536 val = random.uniform(.1, 1.0)
537 self.log("Reweighting osd %s to %s" % (str(osd), str(val)))
538 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
539 str(osd), str(val))
540 else:
541 # do it several times, the option space is large
542 for i in range(5):
543 options = {
544 'max_change': random.choice(['0.05', '1.0', '3.0']),
545 'overage': random.choice(['110', '1000']),
546 'type': random.choice([
547 'reweight-by-utilization',
548 'test-reweight-by-utilization']),
549 }
550 self.log("Reweighting by: %s"%(str(options),))
551 self.ceph_manager.raw_cluster_cmd(
552 'osd',
553 options['type'],
554 options['overage'],
555 options['max_change'])
556
557 def primary_affinity(self, osd=None):
558 if osd is None:
559 osd = random.choice(self.in_osds)
560 if random.random() >= .5:
561 pa = random.random()
562 elif random.random() >= .5:
563 pa = 1
564 else:
565 pa = 0
566 self.log('Setting osd %s primary_affinity to %f' % (str(osd), pa))
567 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
568 str(osd), str(pa))
569
570 def thrash_cluster_full(self):
571 """
572 Set and unset cluster full condition
573 """
574 self.log('Setting full ratio to .001')
575 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
576 time.sleep(1)
577 self.log('Setting full ratio back to .95')
578 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
579
580 def thrash_pg_upmap(self):
581 """
582 Install or remove random pg_upmap entries in OSDMap
583 """
584 from random import shuffle
585 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
586 j = json.loads(out)
587 self.log('j is %s' % j)
588 try:
589 if random.random() >= .3:
590 pgs = self.ceph_manager.get_pg_stats()
591 if not pgs:
592 return
593 pg = random.choice(pgs)
594 pgid = str(pg['pgid'])
595 poolid = int(pgid.split('.')[0])
596 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
597 if len(sizes) == 0:
598 return
599 n = sizes[0]
600 osds = self.in_osds + self.out_osds
601 shuffle(osds)
602 osds = osds[0:n]
603 self.log('Setting %s to %s' % (pgid, osds))
604 cmd = ['osd', 'pg-upmap', pgid] + [str(x) for x in osds]
605 self.log('cmd %s' % cmd)
606 self.ceph_manager.raw_cluster_cmd(*cmd)
607 else:
608 m = j['pg_upmap']
609 if len(m) > 0:
610 shuffle(m)
611 pg = m[0]['pgid']
612 self.log('Clearing pg_upmap on %s' % pg)
613 self.ceph_manager.raw_cluster_cmd(
614 'osd',
615 'rm-pg-upmap',
616 pg)
617 else:
618 self.log('No pg_upmap entries; doing nothing')
619 except CommandFailedError:
620 self.log('Failed to rm-pg-upmap, ignoring')
621
622 def thrash_pg_upmap_items(self):
623 """
624 Install or remove random pg_upmap_items entries in OSDMap
625 """
626 from random import shuffle
627 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
628 j = json.loads(out)
629 self.log('j is %s' % j)
630 try:
631 if random.random() >= .3:
632 pgs = self.ceph_manager.get_pg_stats()
633 if not pgs:
634 return
635 pg = random.choice(pgs)
636 pgid = str(pg['pgid'])
637 poolid = int(pgid.split('.')[0])
638 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
639 if len(sizes) == 0:
640 return
641 n = sizes[0]
642 osds = self.in_osds + self.out_osds
643 shuffle(osds)
644 osds = osds[0:n*2]
645 self.log('Setting %s to %s' % (pgid, osds))
646 cmd = ['osd', 'pg-upmap-items', pgid] + [str(x) for x in osds]
647 self.log('cmd %s' % cmd)
648 self.ceph_manager.raw_cluster_cmd(*cmd)
649 else:
650 m = j['pg_upmap_items']
651 if len(m) > 0:
652 shuffle(m)
653 pg = m[0]['pgid']
654 self.log('Clearing pg_upmap on %s' % pg)
655 self.ceph_manager.raw_cluster_cmd(
656 'osd',
657 'rm-pg-upmap-items',
658 pg)
659 else:
660 self.log('No pg_upmap entries; doing nothing')
661 except CommandFailedError:
662 self.log('Failed to rm-pg-upmap-items, ignoring')
663
664 def force_recovery(self):
665 """
666 Force recovery on some of PGs
667 """
668 backfill = random.random() >= 0.5
669 j = self.ceph_manager.get_pgids_to_force(backfill)
670 if j:
671 try:
672 if backfill:
673 self.ceph_manager.raw_cluster_cmd('pg', 'force-backfill', *j)
674 else:
675 self.ceph_manager.raw_cluster_cmd('pg', 'force-recovery', *j)
676 except CommandFailedError:
677 self.log('Failed to force backfill|recovery, ignoring')
678
679
680 def cancel_force_recovery(self):
681 """
682 Force recovery on some of PGs
683 """
684 backfill = random.random() >= 0.5
685 j = self.ceph_manager.get_pgids_to_cancel_force(backfill)
686 if j:
687 try:
688 if backfill:
689 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-backfill', *j)
690 else:
691 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-recovery', *j)
692 except CommandFailedError:
693 self.log('Failed to force backfill|recovery, ignoring')
694
695 def force_cancel_recovery(self):
696 """
697 Force or cancel forcing recovery
698 """
699 if random.random() >= 0.4:
700 self.force_recovery()
701 else:
702 self.cancel_force_recovery()
703
704 def all_up(self):
705 """
706 Make sure all osds are up and not out.
707 """
708 while len(self.dead_osds) > 0:
709 self.log("reviving osd")
710 self.revive_osd()
711 while len(self.out_osds) > 0:
712 self.log("inning osd")
713 self.in_osd()
714
715 def all_up_in(self):
716 """
717 Make sure all osds are up and fully in.
718 """
719 self.all_up();
720 for osd in self.live_osds:
721 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
722 str(osd), str(1))
723 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
724 str(osd), str(1))
725
726 def do_join(self):
727 """
728 Break out of this Ceph loop
729 """
730 self.stopping = True
731 self.thread.get()
732 if self.sighup_delay:
733 self.log("joining the do_sighup greenlet")
734 self.sighup_thread.get()
735 if self.optrack_toggle_delay:
736 self.log("joining the do_optrack_toggle greenlet")
737 self.optrack_toggle_thread.join()
738 if self.dump_ops_enable == "true":
739 self.log("joining the do_dump_ops greenlet")
740 self.dump_ops_thread.join()
741 if self.noscrub_toggle_delay:
742 self.log("joining the do_noscrub_toggle greenlet")
743 self.noscrub_toggle_thread.join()
744
745 def grow_pool(self):
746 """
747 Increase the size of the pool
748 """
749 pool = self.ceph_manager.get_pool()
750 if pool is None:
751 return
752 self.log("Growing pool %s" % (pool,))
753 if self.ceph_manager.expand_pool(pool,
754 self.config.get('pool_grow_by', 10),
755 self.max_pgs):
756 self.pools_to_fix_pgp_num.add(pool)
757
758 def shrink_pool(self):
759 """
760 Decrease the size of the pool
761 """
762 pool = self.ceph_manager.get_pool()
763 if pool is None:
764 return
765 _ = self.ceph_manager.get_pool_pg_num(pool)
766 self.log("Shrinking pool %s" % (pool,))
767 if self.ceph_manager.contract_pool(
768 pool,
769 self.config.get('pool_shrink_by', 10),
770 self.min_pgs):
771 self.pools_to_fix_pgp_num.add(pool)
772
773 def fix_pgp_num(self, pool=None):
774 """
775 Fix number of pgs in pool.
776 """
777 if pool is None:
778 pool = self.ceph_manager.get_pool()
779 if not pool:
780 return
781 force = False
782 else:
783 force = True
784 self.log("fixing pg num pool %s" % (pool,))
785 if self.ceph_manager.set_pool_pgpnum(pool, force):
786 self.pools_to_fix_pgp_num.discard(pool)
787
788 def test_pool_min_size(self):
789 """
790 Loop to selectively push PGs below their min_size and test that recovery
791 still occurs.
792 """
793 self.log("test_pool_min_size")
794 self.all_up()
795 self.ceph_manager.wait_for_recovery(
796 timeout=self.config.get('timeout')
797 )
798
799 minout = int(self.config.get("min_out", 1))
800 minlive = int(self.config.get("min_live", 2))
801 mindead = int(self.config.get("min_dead", 1))
802 self.log("doing min_size thrashing")
803 self.ceph_manager.wait_for_clean(timeout=60)
804 assert self.ceph_manager.is_clean(), \
805 'not clean before minsize thrashing starts'
806 while not self.stopping:
807 # look up k and m from all the pools on each loop, in case it
808 # changes as the cluster runs
809 k = 0
810 m = 99
811 has_pools = False
812 pools_json = self.ceph_manager.get_osd_dump_json()['pools']
813
814 for pool_json in pools_json:
815 pool = pool_json['pool_name']
816 has_pools = True
817 pool_type = pool_json['type'] # 1 for rep, 3 for ec
818 min_size = pool_json['min_size']
819 self.log("pool {pool} min_size is {min_size}".format(pool=pool,min_size=min_size))
820 try:
821 ec_profile = self.ceph_manager.get_pool_property(pool, 'erasure_code_profile')
822 if pool_type != PoolType.ERASURE_CODED:
823 continue
824 ec_profile = pool_json['erasure_code_profile']
825 ec_profile_json = self.ceph_manager.raw_cluster_cmd(
826 'osd',
827 'erasure-code-profile',
828 'get',
829 ec_profile,
830 '--format=json')
831 ec_json = json.loads(ec_profile_json)
832 local_k = int(ec_json['k'])
833 local_m = int(ec_json['m'])
834 self.log("pool {pool} local_k={k} local_m={m}".format(pool=pool,
835 k=local_k, m=local_m))
836 if local_k > k:
837 self.log("setting k={local_k} from previous {k}".format(local_k=local_k, k=k))
838 k = local_k
839 if local_m < m:
840 self.log("setting m={local_m} from previous {m}".format(local_m=local_m, m=m))
841 m = local_m
842 except CommandFailedError:
843 self.log("failed to read erasure_code_profile. %s was likely removed", pool)
844 continue
845
846 if has_pools :
847 self.log("using k={k}, m={m}".format(k=k,m=m))
848 else:
849 self.log("No pools yet, waiting")
850 time.sleep(5)
851 continue
852
853 if minout > len(self.out_osds): # kill OSDs and mark out
854 self.log("forced to out an osd")
855 self.kill_osd(mark_out=True)
856 continue
857 elif mindead > len(self.dead_osds): # kill OSDs but force timeout
858 self.log("forced to kill an osd")
859 self.kill_osd()
860 continue
861 else: # make mostly-random choice to kill or revive OSDs
862 minup = max(minlive, k)
863 rand_val = random.uniform(0, 1)
864 self.log("choosing based on number of live OSDs and rand val {rand}".\
865 format(rand=rand_val))
866 if len(self.live_osds) > minup+1 and rand_val < 0.5:
867 # chose to knock out as many OSDs as we can w/out downing PGs
868
869 most_killable = min(len(self.live_osds) - minup, m)
870 self.log("chose to kill {n} OSDs".format(n=most_killable))
871 for i in range(1, most_killable):
872 self.kill_osd(mark_out=True)
873 time.sleep(10)
874 # try a few times since there might be a concurrent pool
875 # creation or deletion
876 with safe_while(
877 sleep=5, tries=5,
878 action='check for active or peered') as proceed:
879 while proceed():
880 if self.ceph_manager.all_active_or_peered():
881 break
882 self.log('not all PGs are active or peered')
883 else: # chose to revive OSDs, bring up a random fraction of the dead ones
884 self.log("chose to revive osds")
885 for i in range(1, int(rand_val * len(self.dead_osds))):
886 self.revive_osd(i)
887
888 # let PGs repair themselves or our next knockout might kill one
889 self.ceph_manager.wait_for_clean(timeout=self.config.get('timeout'))
890
891 # / while not self.stopping
892 self.all_up_in()
893
894 self.ceph_manager.wait_for_recovery(
895 timeout=self.config.get('timeout')
896 )
897
898 def inject_pause(self, conf_key, duration, check_after, should_be_down):
899 """
900 Pause injection testing. Check for osd being down when finished.
901 """
902 the_one = random.choice(self.live_osds)
903 self.log("inject_pause on {osd}".format(osd=the_one))
904 self.log(
905 "Testing {key} pause injection for duration {duration}".format(
906 key=conf_key,
907 duration=duration
908 ))
909 self.log(
910 "Checking after {after}, should_be_down={shouldbedown}".format(
911 after=check_after,
912 shouldbedown=should_be_down
913 ))
914 self.ceph_manager.set_config(the_one, **{conf_key: duration})
915 if not should_be_down:
916 return
917 time.sleep(check_after)
918 status = self.ceph_manager.get_osd_status()
919 assert the_one in status['down']
920 time.sleep(duration - check_after + 20)
921 status = self.ceph_manager.get_osd_status()
922 assert not the_one in status['down']
923
924 def test_backfill_full(self):
925 """
926 Test backfills stopping when the replica fills up.
927
928 First, use injectfull admin command to simulate a now full
929 osd by setting it to 0 on all of the OSDs.
930
931 Second, on a random subset, set
932 osd_debug_skip_full_check_in_backfill_reservation to force
933 the more complicated check in do_scan to be exercised.
934
935 Then, verify that all backfillings stop.
936 """
937 self.log("injecting backfill full")
938 for i in self.live_osds:
939 self.ceph_manager.set_config(
940 i,
941 osd_debug_skip_full_check_in_backfill_reservation=
942 random.choice(['false', 'true']))
943 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'backfillfull'],
944 check_status=True, timeout=30, stdout=DEVNULL)
945 for i in range(30):
946 status = self.ceph_manager.compile_pg_status()
947 if 'backfilling' not in status.keys():
948 break
949 self.log(
950 "waiting for {still_going} backfillings".format(
951 still_going=status.get('backfilling')))
952 time.sleep(1)
953 assert('backfilling' not in self.ceph_manager.compile_pg_status().keys())
954 for i in self.live_osds:
955 self.ceph_manager.set_config(
956 i,
957 osd_debug_skip_full_check_in_backfill_reservation='false')
958 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'none'],
959 check_status=True, timeout=30, stdout=DEVNULL)
960
961 def test_map_discontinuity(self):
962 """
963 1) Allows the osds to recover
964 2) kills an osd
965 3) allows the remaining osds to recover
966 4) waits for some time
967 5) revives the osd
968 This sequence should cause the revived osd to have to handle
969 a map gap since the mons would have trimmed
970 """
971 while len(self.in_osds) < (self.minin + 1):
972 self.in_osd()
973 self.log("Waiting for recovery")
974 self.ceph_manager.wait_for_all_osds_up(
975 timeout=self.config.get('timeout')
976 )
977 # now we wait 20s for the pg status to change, if it takes longer,
978 # the test *should* fail!
979 time.sleep(20)
980 self.ceph_manager.wait_for_clean(
981 timeout=self.config.get('timeout')
982 )
983
984 # now we wait 20s for the backfill replicas to hear about the clean
985 time.sleep(20)
986 self.log("Recovered, killing an osd")
987 self.kill_osd(mark_down=True, mark_out=True)
988 self.log("Waiting for clean again")
989 self.ceph_manager.wait_for_clean(
990 timeout=self.config.get('timeout')
991 )
992 self.log("Waiting for trim")
993 time.sleep(int(self.config.get("map_discontinuity_sleep_time", 40)))
994 self.revive_osd()
995
996 def choose_action(self):
997 """
998 Random action selector.
999 """
1000 chance_down = self.config.get('chance_down', 0.4)
1001 _ = self.config.get('chance_test_min_size', 0)
1002 chance_test_backfill_full = \
1003 self.config.get('chance_test_backfill_full', 0)
1004 if isinstance(chance_down, int):
1005 chance_down = float(chance_down) / 100
1006 minin = self.minin
1007 minout = int(self.config.get("min_out", 0))
1008 minlive = int(self.config.get("min_live", 2))
1009 mindead = int(self.config.get("min_dead", 0))
1010
1011 self.log('choose_action: min_in %d min_out '
1012 '%d min_live %d min_dead %d' %
1013 (minin, minout, minlive, mindead))
1014 actions = []
1015 if len(self.in_osds) > minin:
1016 actions.append((self.out_osd, 1.0,))
1017 if len(self.live_osds) > minlive and chance_down > 0:
1018 actions.append((self.kill_osd, chance_down,))
1019 if len(self.out_osds) > minout:
1020 actions.append((self.in_osd, 1.7,))
1021 if len(self.dead_osds) > mindead:
1022 actions.append((self.revive_osd, 1.0,))
1023 if self.config.get('thrash_primary_affinity', True):
1024 actions.append((self.primary_affinity, 1.0,))
1025 actions.append((self.reweight_osd_or_by_util,
1026 self.config.get('reweight_osd', .5),))
1027 actions.append((self.grow_pool,
1028 self.config.get('chance_pgnum_grow', 0),))
1029 actions.append((self.shrink_pool,
1030 self.config.get('chance_pgnum_shrink', 0),))
1031 actions.append((self.fix_pgp_num,
1032 self.config.get('chance_pgpnum_fix', 0),))
1033 actions.append((self.test_pool_min_size,
1034 self.config.get('chance_test_min_size', 0),))
1035 actions.append((self.test_backfill_full,
1036 chance_test_backfill_full,))
1037 if self.chance_thrash_cluster_full > 0:
1038 actions.append((self.thrash_cluster_full, self.chance_thrash_cluster_full,))
1039 if self.chance_thrash_pg_upmap > 0:
1040 actions.append((self.thrash_pg_upmap, self.chance_thrash_pg_upmap,))
1041 if self.chance_thrash_pg_upmap_items > 0:
1042 actions.append((self.thrash_pg_upmap_items, self.chance_thrash_pg_upmap_items,))
1043 if self.chance_force_recovery > 0:
1044 actions.append((self.force_cancel_recovery, self.chance_force_recovery))
1045
1046 for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
1047 for scenario in [
1048 (lambda:
1049 self.inject_pause(key,
1050 self.config.get('pause_short', 3),
1051 0,
1052 False),
1053 self.config.get('chance_inject_pause_short', 1),),
1054 (lambda:
1055 self.inject_pause(key,
1056 self.config.get('pause_long', 80),
1057 self.config.get('pause_check_after', 70),
1058 True),
1059 self.config.get('chance_inject_pause_long', 0),)]:
1060 actions.append(scenario)
1061
1062 total = sum([y for (x, y) in actions])
1063 val = random.uniform(0, total)
1064 for (action, prob) in actions:
1065 if val < prob:
1066 return action
1067 val -= prob
1068 return None
1069
1070 def do_thrash(self):
1071 """
1072 _do_thrash() wrapper.
1073 """
1074 try:
1075 self._do_thrash()
1076 except Exception as e:
1077 # See _run exception comment for MDSThrasher
1078 self.set_thrasher_exception(e)
1079 self.logger.exception("exception:")
1080 # Allow successful completion so gevent doesn't see an exception.
1081 # The DaemonWatchdog will observe the error and tear down the test.
1082
1083 @log_exc
1084 def do_sighup(self):
1085 """
1086 Loops and sends signal.SIGHUP to a random live osd.
1087
1088 Loop delay is controlled by the config value sighup_delay.
1089 """
1090 delay = float(self.sighup_delay)
1091 self.log("starting do_sighup with a delay of {0}".format(delay))
1092 while not self.stopping:
1093 osd = random.choice(self.live_osds)
1094 self.ceph_manager.signal_osd(osd, signal.SIGHUP, silent=True)
1095 time.sleep(delay)
1096
1097 @log_exc
1098 def do_optrack_toggle(self):
1099 """
1100 Loops and toggle op tracking to all osds.
1101
1102 Loop delay is controlled by the config value optrack_toggle_delay.
1103 """
1104 delay = float(self.optrack_toggle_delay)
1105 osd_state = "true"
1106 self.log("starting do_optrack_toggle with a delay of {0}".format(delay))
1107 while not self.stopping:
1108 if osd_state == "true":
1109 osd_state = "false"
1110 else:
1111 osd_state = "true"
1112 try:
1113 self.ceph_manager.inject_args('osd', '*',
1114 'osd_enable_op_tracker',
1115 osd_state)
1116 except CommandFailedError:
1117 self.log('Failed to tell all osds, ignoring')
1118 gevent.sleep(delay)
1119
1120 @log_exc
1121 def do_dump_ops(self):
1122 """
1123 Loops and does op dumps on all osds
1124 """
1125 self.log("starting do_dump_ops")
1126 while not self.stopping:
1127 for osd in self.live_osds:
1128 # Ignore errors because live_osds is in flux
1129 self.ceph_manager.osd_admin_socket(osd, command=['dump_ops_in_flight'],
1130 check_status=False, timeout=30, stdout=DEVNULL)
1131 self.ceph_manager.osd_admin_socket(osd, command=['dump_blocked_ops'],
1132 check_status=False, timeout=30, stdout=DEVNULL)
1133 self.ceph_manager.osd_admin_socket(osd, command=['dump_historic_ops'],
1134 check_status=False, timeout=30, stdout=DEVNULL)
1135 gevent.sleep(0)
1136
1137 @log_exc
1138 def do_noscrub_toggle(self):
1139 """
1140 Loops and toggle noscrub flags
1141
1142 Loop delay is controlled by the config value noscrub_toggle_delay.
1143 """
1144 delay = float(self.noscrub_toggle_delay)
1145 scrub_state = "none"
1146 self.log("starting do_noscrub_toggle with a delay of {0}".format(delay))
1147 while not self.stopping:
1148 if scrub_state == "none":
1149 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'noscrub')
1150 scrub_state = "noscrub"
1151 elif scrub_state == "noscrub":
1152 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
1153 scrub_state = "both"
1154 elif scrub_state == "both":
1155 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
1156 scrub_state = "nodeep-scrub"
1157 else:
1158 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1159 scrub_state = "none"
1160 gevent.sleep(delay)
1161 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
1162 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1163
1164 @log_exc
1165 def _do_thrash(self):
1166 """
1167 Loop to select random actions to thrash ceph manager with.
1168 """
1169 cleanint = self.config.get("clean_interval", 60)
1170 scrubint = self.config.get("scrub_interval", -1)
1171 maxdead = self.config.get("max_dead", 0)
1172 delay = self.config.get("op_delay", 5)
1173 self.rerrosd = self.live_osds[0]
1174 if self.random_eio > 0:
1175 self.ceph_manager.inject_args('osd', self.rerrosd,
1176 'filestore_debug_random_read_err',
1177 self.random_eio)
1178 self.ceph_manager.inject_args('osd', self.rerrosd,
1179 'bluestore_debug_random_read_err',
1180 self.random_eio)
1181 self.log("starting do_thrash")
1182 while not self.stopping:
1183 to_log = [str(x) for x in ["in_osds: ", self.in_osds,
1184 "out_osds: ", self.out_osds,
1185 "dead_osds: ", self.dead_osds,
1186 "live_osds: ", self.live_osds]]
1187 self.log(" ".join(to_log))
1188 if random.uniform(0, 1) < (float(delay) / cleanint):
1189 while len(self.dead_osds) > maxdead:
1190 self.revive_osd()
1191 for osd in self.in_osds:
1192 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
1193 str(osd), str(1))
1194 if random.uniform(0, 1) < float(
1195 self.config.get('chance_test_map_discontinuity', 0)) \
1196 and len(self.live_osds) > 5: # avoid m=2,k=2 stall, w/ some buffer for crush being picky
1197 self.test_map_discontinuity()
1198 else:
1199 self.ceph_manager.wait_for_recovery(
1200 timeout=self.config.get('timeout')
1201 )
1202 time.sleep(self.clean_wait)
1203 if scrubint > 0:
1204 if random.uniform(0, 1) < (float(delay) / scrubint):
1205 self.log('Scrubbing while thrashing being performed')
1206 Scrubber(self.ceph_manager, self.config)
1207 self.choose_action()()
1208 time.sleep(delay)
1209 self.all_up()
1210 if self.random_eio > 0:
1211 self.ceph_manager.inject_args('osd', self.rerrosd,
1212 'filestore_debug_random_read_err', '0.0')
1213 self.ceph_manager.inject_args('osd', self.rerrosd,
1214 'bluestore_debug_random_read_err', '0.0')
1215 for pool in list(self.pools_to_fix_pgp_num):
1216 if self.ceph_manager.get_pool_pg_num(pool) > 0:
1217 self.fix_pgp_num(pool)
1218 self.pools_to_fix_pgp_num.clear()
1219 for service, opt, saved_value in self.saved_options:
1220 self.ceph_manager.inject_args(service, '*', opt, saved_value)
1221 self.saved_options = []
1222 self.all_up_in()
1223
1224
1225 class ObjectStoreTool:
1226
1227 def __init__(self, manager, pool, **kwargs):
1228 self.manager = manager
1229 self.pool = pool
1230 self.osd = kwargs.get('osd', None)
1231 self.object_name = kwargs.get('object_name', None)
1232 self.do_revive = kwargs.get('do_revive', True)
1233 if self.osd and self.pool and self.object_name:
1234 if self.osd == "primary":
1235 self.osd = self.manager.get_object_primary(self.pool,
1236 self.object_name)
1237 assert self.osd
1238 if self.object_name:
1239 self.pgid = self.manager.get_object_pg_with_shard(self.pool,
1240 self.object_name,
1241 self.osd)
1242 self.remote = next(iter(self.manager.ctx.\
1243 cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()))
1244 path = self.manager.get_filepath().format(id=self.osd)
1245 self.paths = ("--data-path {path} --journal-path {path}/journal".
1246 format(path=path))
1247
1248 def build_cmd(self, options, args, stdin):
1249 lines = []
1250 if self.object_name:
1251 lines.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1252 "{paths} --pgid {pgid} --op list |"
1253 "grep '\"oid\":\"{name}\"')".
1254 format(paths=self.paths,
1255 pgid=self.pgid,
1256 name=self.object_name))
1257 args = '"$object" ' + args
1258 options += " --pgid {pgid}".format(pgid=self.pgid)
1259 cmd = ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1260 format(paths=self.paths,
1261 args=args,
1262 options=options))
1263 if stdin:
1264 cmd = ("echo {payload} | base64 --decode | {cmd}".
1265 format(payload=base64.encode(stdin),
1266 cmd=cmd))
1267 lines.append(cmd)
1268 return "\n".join(lines)
1269
1270 def run(self, options, args):
1271 self.manager.kill_osd(self.osd)
1272 cmd = self.build_cmd(options, args, None)
1273 self.manager.log(cmd)
1274 try:
1275 proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
1276 check_status=False,
1277 stdout=BytesIO(),
1278 stderr=BytesIO())
1279 proc.wait()
1280 if proc.exitstatus != 0:
1281 self.manager.log("failed with " + str(proc.exitstatus))
1282 error = proc.stdout.getvalue().decode() + " " + \
1283 proc.stderr.getvalue().decode()
1284 raise Exception(error)
1285 finally:
1286 if self.do_revive:
1287 self.manager.revive_osd(self.osd)
1288 self.manager.wait_till_osd_is_up(self.osd, 300)
1289
1290
1291 # XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1292 # the same name.
1293 class CephManager:
1294 """
1295 Ceph manager object.
1296 Contains several local functions that form a bulk of this module.
1297
1298 :param controller: the remote machine where the Ceph commands should be
1299 executed
1300 :param ctx: the cluster context
1301 :param config: path to Ceph config file
1302 :param logger: for logging messages
1303 :param cluster: name of the Ceph cluster
1304 """
1305
1306 def __init__(self, controller, ctx=None, config=None, logger=None,
1307 cluster='ceph', cephadm=False):
1308 self.lock = threading.RLock()
1309 self.ctx = ctx
1310 self.config = config
1311 self.controller = controller
1312 self.next_pool_id = 0
1313 self.cluster = cluster
1314 self.cephadm = cephadm
1315 if (logger):
1316 self.log = lambda x: logger.info(x)
1317 else:
1318 def tmp(x):
1319 """
1320 implement log behavior.
1321 """
1322 print(x)
1323 self.log = tmp
1324 if self.config is None:
1325 self.config = dict()
1326 pools = self.list_pools()
1327 self.pools = {}
1328 for pool in pools:
1329 # we may race with a pool deletion; ignore failures here
1330 try:
1331 self.pools[pool] = self.get_pool_int_property(pool, 'pg_num')
1332 except CommandFailedError:
1333 self.log('Failed to get pg_num from pool %s, ignoring' % pool)
1334
1335 def raw_cluster_cmd(self, *args):
1336 """
1337 Start ceph on a raw cluster. Return count
1338 """
1339 if self.cephadm:
1340 proc = shell(self.ctx, self.cluster, self.controller,
1341 args=['ceph'] + list(args),
1342 stdout=BytesIO())
1343 else:
1344 testdir = teuthology.get_testdir(self.ctx)
1345 ceph_args = [
1346 'sudo',
1347 'adjust-ulimits',
1348 'ceph-coverage',
1349 '{tdir}/archive/coverage'.format(tdir=testdir),
1350 'timeout',
1351 '120',
1352 'ceph',
1353 '--cluster',
1354 self.cluster,
1355 '--log-early',
1356 ]
1357 ceph_args.extend(args)
1358 proc = self.controller.run(
1359 args=ceph_args,
1360 stdout=BytesIO(),
1361 )
1362 return six.ensure_str(proc.stdout.getvalue())
1363
1364 def raw_cluster_cmd_result(self, *args, **kwargs):
1365 """
1366 Start ceph on a cluster. Return success or failure information.
1367 """
1368 if self.cephadm:
1369 proc = shell(self.ctx, self.cluster, self.controller,
1370 args=['ceph'] + list(args),
1371 check_status=False)
1372 else:
1373 testdir = teuthology.get_testdir(self.ctx)
1374 ceph_args = [
1375 'sudo',
1376 'adjust-ulimits',
1377 'ceph-coverage',
1378 '{tdir}/archive/coverage'.format(tdir=testdir),
1379 'timeout',
1380 '120',
1381 'ceph',
1382 '--cluster',
1383 self.cluster,
1384 ]
1385 ceph_args.extend(args)
1386 kwargs['args'] = ceph_args
1387 kwargs['check_status'] = False
1388 proc = self.controller.run(**kwargs)
1389 return proc.exitstatus
1390
1391 def run_ceph_w(self, watch_channel=None):
1392 """
1393 Execute "ceph -w" in the background with stdout connected to a BytesIO,
1394 and return the RemoteProcess.
1395
1396 :param watch_channel: Specifies the channel to be watched. This can be
1397 'cluster', 'audit', ...
1398 :type watch_channel: str
1399 """
1400 args = ["sudo",
1401 "daemon-helper",
1402 "kill",
1403 "ceph",
1404 '--cluster',
1405 self.cluster,
1406 "-w"]
1407 if watch_channel is not None:
1408 args.append("--watch-channel")
1409 args.append(watch_channel)
1410 return self.controller.run(args=args, wait=False, stdout=StringIO(), stdin=run.PIPE)
1411
1412 def get_mon_socks(self):
1413 """
1414 Get monitor sockets.
1415
1416 :return socks: tuple of strings; strings are individual sockets.
1417 """
1418 from json import loads
1419
1420 output = loads(self.raw_cluster_cmd(['--format=json', 'mon', 'dump']))
1421 socks = []
1422 for mon in output['mons']:
1423 for addrvec_mem in mon['public_addrs']['addrvec']:
1424 socks.append(addrvec_mem['addr'])
1425 return tuple(socks)
1426
1427 def get_msgrv1_mon_socks(self):
1428 """
1429 Get monitor sockets that use msgrv1 to operate.
1430
1431 :return socks: tuple of strings; strings are individual sockets.
1432 """
1433 from json import loads
1434
1435 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1436 socks = []
1437 for mon in output['mons']:
1438 for addrvec_mem in mon['public_addrs']['addrvec']:
1439 if addrvec_mem['type'] == 'v1':
1440 socks.append(addrvec_mem['addr'])
1441 return tuple(socks)
1442
1443 def get_msgrv2_mon_socks(self):
1444 """
1445 Get monitor sockets that use msgrv2 to operate.
1446
1447 :return socks: tuple of strings; strings are individual sockets.
1448 """
1449 from json import loads
1450
1451 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1452 socks = []
1453 for mon in output['mons']:
1454 for addrvec_mem in mon['public_addrs']['addrvec']:
1455 if addrvec_mem['type'] == 'v2':
1456 socks.append(addrvec_mem['addr'])
1457 return tuple(socks)
1458
1459 def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
1460 """
1461 Flush pg stats from a list of OSD ids, ensuring they are reflected
1462 all the way to the monitor. Luminous and later only.
1463
1464 :param osds: list of OSDs to flush
1465 :param no_wait: list of OSDs not to wait for seq id. by default, we
1466 wait for all specified osds, but some of them could be
1467 moved out of osdmap, so we cannot get their updated
1468 stat seq from monitor anymore. in that case, you need
1469 to pass a blacklist.
1470 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
1471 it. (5 min by default)
1472 """
1473 seq = {osd: int(self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats'))
1474 for osd in osds}
1475 if not wait_for_mon:
1476 return
1477 if no_wait is None:
1478 no_wait = []
1479 for osd, need in seq.items():
1480 if osd in no_wait:
1481 continue
1482 got = 0
1483 while wait_for_mon > 0:
1484 got = int(self.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd))
1485 self.log('need seq {need} got {got} for osd.{osd}'.format(
1486 need=need, got=got, osd=osd))
1487 if got >= need:
1488 break
1489 A_WHILE = 1
1490 time.sleep(A_WHILE)
1491 wait_for_mon -= A_WHILE
1492 else:
1493 raise Exception('timed out waiting for mon to be updated with '
1494 'osd.{osd}: {got} < {need}'.
1495 format(osd=osd, got=got, need=need))
1496
1497 def flush_all_pg_stats(self):
1498 self.flush_pg_stats(range(len(self.get_osd_dump())))
1499
1500 def do_rados(self, remote, cmd, check_status=True):
1501 """
1502 Execute a remote rados command.
1503 """
1504 testdir = teuthology.get_testdir(self.ctx)
1505 pre = [
1506 'adjust-ulimits',
1507 'ceph-coverage',
1508 '{tdir}/archive/coverage'.format(tdir=testdir),
1509 'rados',
1510 '--cluster',
1511 self.cluster,
1512 ]
1513 pre.extend(cmd)
1514 proc = remote.run(
1515 args=pre,
1516 wait=True,
1517 check_status=check_status
1518 )
1519 return proc
1520
1521 def rados_write_objects(self, pool, num_objects, size,
1522 timelimit, threads, cleanup=False):
1523 """
1524 Write rados objects
1525 Threads not used yet.
1526 """
1527 args = [
1528 '-p', pool,
1529 '--num-objects', num_objects,
1530 '-b', size,
1531 'bench', timelimit,
1532 'write'
1533 ]
1534 if not cleanup:
1535 args.append('--no-cleanup')
1536 return self.do_rados(self.controller, map(str, args))
1537
1538 def do_put(self, pool, obj, fname, namespace=None):
1539 """
1540 Implement rados put operation
1541 """
1542 args = ['-p', pool]
1543 if namespace is not None:
1544 args += ['-N', namespace]
1545 args += [
1546 'put',
1547 obj,
1548 fname
1549 ]
1550 return self.do_rados(
1551 self.controller,
1552 args,
1553 check_status=False
1554 ).exitstatus
1555
1556 def do_get(self, pool, obj, fname='/dev/null', namespace=None):
1557 """
1558 Implement rados get operation
1559 """
1560 args = ['-p', pool]
1561 if namespace is not None:
1562 args += ['-N', namespace]
1563 args += [
1564 'get',
1565 obj,
1566 fname
1567 ]
1568 return self.do_rados(
1569 self.controller,
1570 args,
1571 check_status=False
1572 ).exitstatus
1573
1574 def do_rm(self, pool, obj, namespace=None):
1575 """
1576 Implement rados rm operation
1577 """
1578 args = ['-p', pool]
1579 if namespace is not None:
1580 args += ['-N', namespace]
1581 args += [
1582 'rm',
1583 obj
1584 ]
1585 return self.do_rados(
1586 self.controller,
1587 args,
1588 check_status=False
1589 ).exitstatus
1590
1591 def osd_admin_socket(self, osd_id, command, check_status=True, timeout=0, stdout=None):
1592 if stdout is None:
1593 stdout = StringIO()
1594 return self.admin_socket('osd', osd_id, command, check_status, timeout, stdout)
1595
1596 def find_remote(self, service_type, service_id):
1597 """
1598 Get the Remote for the host where a particular service runs.
1599
1600 :param service_type: 'mds', 'osd', 'client'
1601 :param service_id: The second part of a role, e.g. '0' for
1602 the role 'client.0'
1603 :return: a Remote instance for the host where the
1604 requested role is placed
1605 """
1606 return get_remote(self.ctx, self.cluster,
1607 service_type, service_id)
1608
1609 def admin_socket(self, service_type, service_id,
1610 command, check_status=True, timeout=0, stdout=None):
1611 """
1612 Remotely start up ceph specifying the admin socket
1613 :param command: a list of words to use as the command
1614 to the admin socket
1615 """
1616 if stdout is None:
1617 stdout = StringIO()
1618
1619 remote = self.find_remote(service_type, service_id)
1620
1621 if self.cephadm:
1622 return shell(
1623 self.ctx, self.cluster, remote,
1624 args=[
1625 'ceph', 'daemon', '%s.%s' % (service_type, service_id),
1626 ] + command,
1627 stdout=stdout,
1628 wait=True,
1629 check_status=check_status,
1630 )
1631
1632 testdir = teuthology.get_testdir(self.ctx)
1633 args = [
1634 'sudo',
1635 'adjust-ulimits',
1636 'ceph-coverage',
1637 '{tdir}/archive/coverage'.format(tdir=testdir),
1638 'timeout',
1639 str(timeout),
1640 'ceph',
1641 '--cluster',
1642 self.cluster,
1643 '--admin-daemon',
1644 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1645 cluster=self.cluster,
1646 type=service_type,
1647 id=service_id),
1648 ]
1649 args.extend(command)
1650 return remote.run(
1651 args=args,
1652 stdout=stdout,
1653 wait=True,
1654 check_status=check_status
1655 )
1656
1657 def objectstore_tool(self, pool, options, args, **kwargs):
1658 return ObjectStoreTool(self, pool, **kwargs).run(options, args)
1659
1660 def get_pgid(self, pool, pgnum):
1661 """
1662 :param pool: pool name
1663 :param pgnum: pg number
1664 :returns: a string representing this pg.
1665 """
1666 poolnum = self.get_pool_num(pool)
1667 pg_str = "{poolnum}.{pgnum}".format(
1668 poolnum=poolnum,
1669 pgnum=pgnum)
1670 return pg_str
1671
1672 def get_pg_replica(self, pool, pgnum):
1673 """
1674 get replica for pool, pgnum (e.g. (data, 0)->0
1675 """
1676 pg_str = self.get_pgid(pool, pgnum)
1677 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1678 j = json.loads('\n'.join(output.split('\n')[1:]))
1679 return int(j['acting'][-1])
1680 assert False
1681
1682 def wait_for_pg_stats(func):
1683 # both osd_mon_report_interval and mgr_stats_period are 5 seconds
1684 # by default, and take the faulty injection in ms into consideration,
1685 # 12 seconds are more than enough
1686 delays = [1, 1, 2, 3, 5, 8, 13, 0]
1687 @wraps(func)
1688 def wrapper(self, *args, **kwargs):
1689 exc = None
1690 for delay in delays:
1691 try:
1692 return func(self, *args, **kwargs)
1693 except AssertionError as e:
1694 time.sleep(delay)
1695 exc = e
1696 raise exc
1697 return wrapper
1698
1699 def get_pg_primary(self, pool, pgnum):
1700 """
1701 get primary for pool, pgnum (e.g. (data, 0)->0
1702 """
1703 pg_str = self.get_pgid(pool, pgnum)
1704 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1705 j = json.loads('\n'.join(output.split('\n')[1:]))
1706 return int(j['acting'][0])
1707 assert False
1708
1709 def get_pool_num(self, pool):
1710 """
1711 get number for pool (e.g., data -> 2)
1712 """
1713 return int(self.get_pool_dump(pool)['pool'])
1714
1715 def list_pools(self):
1716 """
1717 list all pool names
1718 """
1719 osd_dump = self.get_osd_dump_json()
1720 self.log(osd_dump['pools'])
1721 return [str(i['pool_name']) for i in osd_dump['pools']]
1722
1723 def clear_pools(self):
1724 """
1725 remove all pools
1726 """
1727 [self.remove_pool(i) for i in self.list_pools()]
1728
1729 def kick_recovery_wq(self, osdnum):
1730 """
1731 Run kick_recovery_wq on cluster.
1732 """
1733 return self.raw_cluster_cmd(
1734 'tell', "osd.%d" % (int(osdnum),),
1735 'debug',
1736 'kick_recovery_wq',
1737 '0')
1738
1739 def wait_run_admin_socket(self, service_type,
1740 service_id, args=['version'], timeout=75, stdout=None):
1741 """
1742 If osd_admin_socket call succeeds, return. Otherwise wait
1743 five seconds and try again.
1744 """
1745 if stdout is None:
1746 stdout = StringIO()
1747 tries = 0
1748 while True:
1749 proc = self.admin_socket(service_type, service_id,
1750 args, check_status=False, stdout=stdout)
1751 if proc.exitstatus == 0:
1752 return proc
1753 else:
1754 tries += 1
1755 if (tries * 5) > timeout:
1756 raise Exception('timed out waiting for admin_socket '
1757 'to appear after {type}.{id} restart'.
1758 format(type=service_type,
1759 id=service_id))
1760 self.log("waiting on admin_socket for {type}-{id}, "
1761 "{command}".format(type=service_type,
1762 id=service_id,
1763 command=args))
1764 time.sleep(5)
1765
1766 def get_pool_dump(self, pool):
1767 """
1768 get the osd dump part of a pool
1769 """
1770 osd_dump = self.get_osd_dump_json()
1771 for i in osd_dump['pools']:
1772 if i['pool_name'] == pool:
1773 return i
1774 assert False
1775
1776 def get_config(self, service_type, service_id, name):
1777 """
1778 :param node: like 'mon.a'
1779 :param name: the option name
1780 """
1781 proc = self.wait_run_admin_socket(service_type, service_id,
1782 ['config', 'show'])
1783 j = json.loads(proc.stdout.getvalue())
1784 return j[name]
1785
1786 def inject_args(self, service_type, service_id, name, value):
1787 whom = '{0}.{1}'.format(service_type, service_id)
1788 if isinstance(value, bool):
1789 value = 'true' if value else 'false'
1790 opt_arg = '--{name}={value}'.format(name=name, value=value)
1791 self.raw_cluster_cmd('--', 'tell', whom, 'injectargs', opt_arg)
1792
1793 def set_config(self, osdnum, **argdict):
1794 """
1795 :param osdnum: osd number
1796 :param argdict: dictionary containing values to set.
1797 """
1798 for k, v in argdict.items():
1799 self.wait_run_admin_socket(
1800 'osd', osdnum,
1801 ['config', 'set', str(k), str(v)])
1802
1803 def raw_cluster_status(self):
1804 """
1805 Get status from cluster
1806 """
1807 status = self.raw_cluster_cmd('status', '--format=json')
1808 return json.loads(status)
1809
1810 def raw_osd_status(self):
1811 """
1812 Get osd status from cluster
1813 """
1814 return self.raw_cluster_cmd('osd', 'dump')
1815
1816 def get_osd_status(self):
1817 """
1818 Get osd statuses sorted by states that the osds are in.
1819 """
1820 osd_lines = list(filter(
1821 lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
1822 self.raw_osd_status().split('\n')))
1823 self.log(osd_lines)
1824 in_osds = [int(i[4:].split()[0])
1825 for i in filter(lambda x: " in " in x, osd_lines)]
1826 out_osds = [int(i[4:].split()[0])
1827 for i in filter(lambda x: " out " in x, osd_lines)]
1828 up_osds = [int(i[4:].split()[0])
1829 for i in filter(lambda x: " up " in x, osd_lines)]
1830 down_osds = [int(i[4:].split()[0])
1831 for i in filter(lambda x: " down " in x, osd_lines)]
1832 dead_osds = [int(x.id_)
1833 for x in filter(lambda x:
1834 not x.running(),
1835 self.ctx.daemons.
1836 iter_daemons_of_role('osd', self.cluster))]
1837 live_osds = [int(x.id_) for x in
1838 filter(lambda x:
1839 x.running(),
1840 self.ctx.daemons.iter_daemons_of_role('osd',
1841 self.cluster))]
1842 return {'in': in_osds, 'out': out_osds, 'up': up_osds,
1843 'down': down_osds, 'dead': dead_osds, 'live': live_osds,
1844 'raw': osd_lines}
1845
1846 def get_num_pgs(self):
1847 """
1848 Check cluster status for the number of pgs
1849 """
1850 status = self.raw_cluster_status()
1851 self.log(status)
1852 return status['pgmap']['num_pgs']
1853
1854 def create_erasure_code_profile(self, profile_name, profile):
1855 """
1856 Create an erasure code profile name that can be used as a parameter
1857 when creating an erasure coded pool.
1858 """
1859 with self.lock:
1860 args = cmd_erasure_code_profile(profile_name, profile)
1861 self.raw_cluster_cmd(*args)
1862
1863 def create_pool_with_unique_name(self, pg_num=16,
1864 erasure_code_profile_name=None,
1865 min_size=None,
1866 erasure_code_use_overwrites=False):
1867 """
1868 Create a pool named unique_pool_X where X is unique.
1869 """
1870 name = ""
1871 with self.lock:
1872 name = "unique_pool_%s" % (str(self.next_pool_id),)
1873 self.next_pool_id += 1
1874 self.create_pool(
1875 name,
1876 pg_num,
1877 erasure_code_profile_name=erasure_code_profile_name,
1878 min_size=min_size,
1879 erasure_code_use_overwrites=erasure_code_use_overwrites)
1880 return name
1881
1882 @contextlib.contextmanager
1883 def pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
1884 self.create_pool(pool_name, pg_num, erasure_code_profile_name)
1885 yield
1886 self.remove_pool(pool_name)
1887
1888 def create_pool(self, pool_name, pg_num=16,
1889 erasure_code_profile_name=None,
1890 min_size=None,
1891 erasure_code_use_overwrites=False):
1892 """
1893 Create a pool named from the pool_name parameter.
1894 :param pool_name: name of the pool being created.
1895 :param pg_num: initial number of pgs.
1896 :param erasure_code_profile_name: if set and !None create an
1897 erasure coded pool using the profile
1898 :param erasure_code_use_overwrites: if true, allow overwrites
1899 """
1900 with self.lock:
1901 assert isinstance(pool_name, str)
1902 assert isinstance(pg_num, int)
1903 assert pool_name not in self.pools
1904 self.log("creating pool_name %s" % (pool_name,))
1905 if erasure_code_profile_name:
1906 self.raw_cluster_cmd('osd', 'pool', 'create',
1907 pool_name, str(pg_num), str(pg_num),
1908 'erasure', erasure_code_profile_name)
1909 else:
1910 self.raw_cluster_cmd('osd', 'pool', 'create',
1911 pool_name, str(pg_num))
1912 if min_size is not None:
1913 self.raw_cluster_cmd(
1914 'osd', 'pool', 'set', pool_name,
1915 'min_size',
1916 str(min_size))
1917 if erasure_code_use_overwrites:
1918 self.raw_cluster_cmd(
1919 'osd', 'pool', 'set', pool_name,
1920 'allow_ec_overwrites',
1921 'true')
1922 self.raw_cluster_cmd(
1923 'osd', 'pool', 'application', 'enable',
1924 pool_name, 'rados', '--yes-i-really-mean-it',
1925 run.Raw('||'), 'true')
1926 self.pools[pool_name] = pg_num
1927 time.sleep(1)
1928
1929 def add_pool_snap(self, pool_name, snap_name):
1930 """
1931 Add pool snapshot
1932 :param pool_name: name of pool to snapshot
1933 :param snap_name: name of snapshot to take
1934 """
1935 self.raw_cluster_cmd('osd', 'pool', 'mksnap',
1936 str(pool_name), str(snap_name))
1937
1938 def remove_pool_snap(self, pool_name, snap_name):
1939 """
1940 Remove pool snapshot
1941 :param pool_name: name of pool to snapshot
1942 :param snap_name: name of snapshot to remove
1943 """
1944 self.raw_cluster_cmd('osd', 'pool', 'rmsnap',
1945 str(pool_name), str(snap_name))
1946
1947 def remove_pool(self, pool_name):
1948 """
1949 Remove the indicated pool
1950 :param pool_name: Pool to be removed
1951 """
1952 with self.lock:
1953 assert isinstance(pool_name, str)
1954 assert pool_name in self.pools
1955 self.log("removing pool_name %s" % (pool_name,))
1956 del self.pools[pool_name]
1957 self.raw_cluster_cmd('osd', 'pool', 'rm', pool_name, pool_name,
1958 "--yes-i-really-really-mean-it")
1959
1960 def get_pool(self):
1961 """
1962 Pick a random pool
1963 """
1964 with self.lock:
1965 if self.pools:
1966 return random.sample(self.pools.keys(), 1)[0]
1967
1968 def get_pool_pg_num(self, pool_name):
1969 """
1970 Return the number of pgs in the pool specified.
1971 """
1972 with self.lock:
1973 assert isinstance(pool_name, str)
1974 if pool_name in self.pools:
1975 return self.pools[pool_name]
1976 return 0
1977
1978 def get_pool_property(self, pool_name, prop):
1979 """
1980 :param pool_name: pool
1981 :param prop: property to be checked.
1982 :returns: property as string
1983 """
1984 with self.lock:
1985 assert isinstance(pool_name, str)
1986 assert isinstance(prop, str)
1987 output = self.raw_cluster_cmd(
1988 'osd',
1989 'pool',
1990 'get',
1991 pool_name,
1992 prop)
1993 return output.split()[1]
1994
1995 def get_pool_int_property(self, pool_name, prop):
1996 return int(self.get_pool_property(pool_name, prop))
1997
1998 def set_pool_property(self, pool_name, prop, val):
1999 """
2000 :param pool_name: pool
2001 :param prop: property to be set.
2002 :param val: value to set.
2003
2004 This routine retries if set operation fails.
2005 """
2006 with self.lock:
2007 assert isinstance(pool_name, str)
2008 assert isinstance(prop, str)
2009 assert isinstance(val, int)
2010 tries = 0
2011 while True:
2012 r = self.raw_cluster_cmd_result(
2013 'osd',
2014 'pool',
2015 'set',
2016 pool_name,
2017 prop,
2018 str(val))
2019 if r != 11: # EAGAIN
2020 break
2021 tries += 1
2022 if tries > 50:
2023 raise Exception('timed out getting EAGAIN '
2024 'when setting pool property %s %s = %s' %
2025 (pool_name, prop, val))
2026 self.log('got EAGAIN setting pool property, '
2027 'waiting a few seconds...')
2028 time.sleep(2)
2029
2030 def expand_pool(self, pool_name, by, max_pgs):
2031 """
2032 Increase the number of pgs in a pool
2033 """
2034 with self.lock:
2035 assert isinstance(pool_name, str)
2036 assert isinstance(by, int)
2037 assert pool_name in self.pools
2038 if self.get_num_creating() > 0:
2039 return False
2040 if (self.pools[pool_name] + by) > max_pgs:
2041 return False
2042 self.log("increase pool size by %d" % (by,))
2043 new_pg_num = self.pools[pool_name] + by
2044 self.set_pool_property(pool_name, "pg_num", new_pg_num)
2045 self.pools[pool_name] = new_pg_num
2046 return True
2047
2048 def contract_pool(self, pool_name, by, min_pgs):
2049 """
2050 Decrease the number of pgs in a pool
2051 """
2052 with self.lock:
2053 self.log('contract_pool %s by %s min %s' % (
2054 pool_name, str(by), str(min_pgs)))
2055 assert isinstance(pool_name, str)
2056 assert isinstance(by, int)
2057 assert pool_name in self.pools
2058 if self.get_num_creating() > 0:
2059 self.log('too many creating')
2060 return False
2061 proj = self.pools[pool_name] - by
2062 if proj < min_pgs:
2063 self.log('would drop below min_pgs, proj %d, currently %d' % (proj,self.pools[pool_name],))
2064 return False
2065 self.log("decrease pool size by %d" % (by,))
2066 new_pg_num = self.pools[pool_name] - by
2067 self.set_pool_property(pool_name, "pg_num", new_pg_num)
2068 self.pools[pool_name] = new_pg_num
2069 return True
2070
2071 def stop_pg_num_changes(self):
2072 """
2073 Reset all pg_num_targets back to pg_num, canceling splits and merges
2074 """
2075 self.log('Canceling any pending splits or merges...')
2076 osd_dump = self.get_osd_dump_json()
2077 try:
2078 for pool in osd_dump['pools']:
2079 if pool['pg_num'] != pool['pg_num_target']:
2080 self.log('Setting pool %s (%d) pg_num %d -> %d' %
2081 (pool['pool_name'], pool['pool'],
2082 pool['pg_num_target'],
2083 pool['pg_num']))
2084 self.raw_cluster_cmd('osd', 'pool', 'set', pool['pool_name'],
2085 'pg_num', str(pool['pg_num']))
2086 except KeyError:
2087 # we don't support pg_num_target before nautilus
2088 pass
2089
2090 def set_pool_pgpnum(self, pool_name, force):
2091 """
2092 Set pgpnum property of pool_name pool.
2093 """
2094 with self.lock:
2095 assert isinstance(pool_name, str)
2096 assert pool_name in self.pools
2097 if not force and self.get_num_creating() > 0:
2098 return False
2099 self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
2100 return True
2101
2102 def list_pg_unfound(self, pgid):
2103 """
2104 return list of unfound pgs with the id specified
2105 """
2106 r = None
2107 offset = {}
2108 while True:
2109 out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_unfound',
2110 json.dumps(offset))
2111 j = json.loads(out)
2112 if r is None:
2113 r = j
2114 else:
2115 r['objects'].extend(j['objects'])
2116 if not 'more' in j:
2117 break
2118 if j['more'] == 0:
2119 break
2120 offset = j['objects'][-1]['oid']
2121 if 'more' in r:
2122 del r['more']
2123 return r
2124
2125 def get_pg_stats(self):
2126 """
2127 Dump the cluster and get pg stats
2128 """
2129 out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
2130 j = json.loads('\n'.join(out.split('\n')[1:]))
2131 try:
2132 return j['pg_map']['pg_stats']
2133 except KeyError:
2134 return j['pg_stats']
2135
2136 def get_pgids_to_force(self, backfill):
2137 """
2138 Return the randomized list of PGs that can have their recovery/backfill forced
2139 """
2140 j = self.get_pg_stats();
2141 pgids = []
2142 if backfill:
2143 wanted = ['degraded', 'backfilling', 'backfill_wait']
2144 else:
2145 wanted = ['recovering', 'degraded', 'recovery_wait']
2146 for pg in j:
2147 status = pg['state'].split('+')
2148 for t in wanted:
2149 if random.random() > 0.5 and not ('forced_backfill' in status or 'forced_recovery' in status) and t in status:
2150 pgids.append(pg['pgid'])
2151 break
2152 return pgids
2153
2154 def get_pgids_to_cancel_force(self, backfill):
2155 """
2156 Return the randomized list of PGs whose recovery/backfill priority is forced
2157 """
2158 j = self.get_pg_stats();
2159 pgids = []
2160 if backfill:
2161 wanted = 'forced_backfill'
2162 else:
2163 wanted = 'forced_recovery'
2164 for pg in j:
2165 status = pg['state'].split('+')
2166 if wanted in status and random.random() > 0.5:
2167 pgids.append(pg['pgid'])
2168 return pgids
2169
2170 def compile_pg_status(self):
2171 """
2172 Return a histogram of pg state values
2173 """
2174 ret = {}
2175 j = self.get_pg_stats()
2176 for pg in j:
2177 for status in pg['state'].split('+'):
2178 if status not in ret:
2179 ret[status] = 0
2180 ret[status] += 1
2181 return ret
2182
2183 @wait_for_pg_stats # type: ignore
2184 def with_pg_state(self, pool, pgnum, check):
2185 pgstr = self.get_pgid(pool, pgnum)
2186 stats = self.get_single_pg_stats(pgstr)
2187 assert(check(stats['state']))
2188
2189 @wait_for_pg_stats # type: ignore
2190 def with_pg(self, pool, pgnum, check):
2191 pgstr = self.get_pgid(pool, pgnum)
2192 stats = self.get_single_pg_stats(pgstr)
2193 return check(stats)
2194
2195 def get_last_scrub_stamp(self, pool, pgnum):
2196 """
2197 Get the timestamp of the last scrub.
2198 """
2199 stats = self.get_single_pg_stats(self.get_pgid(pool, pgnum))
2200 return stats["last_scrub_stamp"]
2201
2202 def do_pg_scrub(self, pool, pgnum, stype):
2203 """
2204 Scrub pg and wait for scrubbing to finish
2205 """
2206 init = self.get_last_scrub_stamp(pool, pgnum)
2207 RESEND_TIMEOUT = 120 # Must be a multiple of SLEEP_TIME
2208 FATAL_TIMEOUT = RESEND_TIMEOUT * 3
2209 SLEEP_TIME = 10
2210 timer = 0
2211 while init == self.get_last_scrub_stamp(pool, pgnum):
2212 assert timer < FATAL_TIMEOUT, "fatal timeout trying to " + stype
2213 self.log("waiting for scrub type %s" % (stype,))
2214 if (timer % RESEND_TIMEOUT) == 0:
2215 self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
2216 # The first time in this loop is the actual request
2217 if timer != 0 and stype == "repair":
2218 self.log("WARNING: Resubmitted a non-idempotent repair")
2219 time.sleep(SLEEP_TIME)
2220 timer += SLEEP_TIME
2221
2222 def wait_snap_trimming_complete(self, pool):
2223 """
2224 Wait for snap trimming on pool to end
2225 """
2226 POLL_PERIOD = 10
2227 FATAL_TIMEOUT = 600
2228 start = time.time()
2229 poolnum = self.get_pool_num(pool)
2230 poolnumstr = "%s." % (poolnum,)
2231 while (True):
2232 now = time.time()
2233 if (now - start) > FATAL_TIMEOUT:
2234 assert (now - start) < FATAL_TIMEOUT, \
2235 'failed to complete snap trimming before timeout'
2236 all_stats = self.get_pg_stats()
2237 trimming = False
2238 for pg in all_stats:
2239 if (poolnumstr in pg['pgid']) and ('snaptrim' in pg['state']):
2240 self.log("pg {pg} in trimming, state: {state}".format(
2241 pg=pg['pgid'],
2242 state=pg['state']))
2243 trimming = True
2244 if not trimming:
2245 break
2246 self.log("{pool} still trimming, waiting".format(pool=pool))
2247 time.sleep(POLL_PERIOD)
2248
2249 def get_single_pg_stats(self, pgid):
2250 """
2251 Return pg for the pgid specified.
2252 """
2253 all_stats = self.get_pg_stats()
2254
2255 for pg in all_stats:
2256 if pg['pgid'] == pgid:
2257 return pg
2258
2259 return None
2260
2261 def get_object_pg_with_shard(self, pool, name, osdid):
2262 """
2263 """
2264 pool_dump = self.get_pool_dump(pool)
2265 object_map = self.get_object_map(pool, name)
2266 if pool_dump["type"] == PoolType.ERASURE_CODED:
2267 shard = object_map['acting'].index(osdid)
2268 return "{pgid}s{shard}".format(pgid=object_map['pgid'],
2269 shard=shard)
2270 else:
2271 return object_map['pgid']
2272
2273 def get_object_primary(self, pool, name):
2274 """
2275 """
2276 object_map = self.get_object_map(pool, name)
2277 return object_map['acting_primary']
2278
2279 def get_object_map(self, pool, name):
2280 """
2281 osd map --format=json converted to a python object
2282 :returns: the python object
2283 """
2284 out = self.raw_cluster_cmd('--format=json', 'osd', 'map', pool, name)
2285 return json.loads('\n'.join(out.split('\n')[1:]))
2286
2287 def get_osd_dump_json(self):
2288 """
2289 osd dump --format=json converted to a python object
2290 :returns: the python object
2291 """
2292 out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
2293 return json.loads('\n'.join(out.split('\n')[1:]))
2294
2295 def get_osd_dump(self):
2296 """
2297 Dump osds
2298 :returns: all osds
2299 """
2300 return self.get_osd_dump_json()['osds']
2301
2302 def get_osd_metadata(self):
2303 """
2304 osd metadata --format=json converted to a python object
2305 :returns: the python object containing osd metadata information
2306 """
2307 out = self.raw_cluster_cmd('osd', 'metadata', '--format=json')
2308 return json.loads('\n'.join(out.split('\n')[1:]))
2309
2310 def get_mgr_dump(self):
2311 out = self.raw_cluster_cmd('mgr', 'dump', '--format=json')
2312 return json.loads(out)
2313
2314 def get_stuck_pgs(self, type_, threshold):
2315 """
2316 :returns: stuck pg information from the cluster
2317 """
2318 out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
2319 '--format=json')
2320 return json.loads(out).get('stuck_pg_stats',[])
2321
2322 def get_num_unfound_objects(self):
2323 """
2324 Check cluster status to get the number of unfound objects
2325 """
2326 status = self.raw_cluster_status()
2327 self.log(status)
2328 return status['pgmap'].get('unfound_objects', 0)
2329
2330 def get_num_creating(self):
2331 """
2332 Find the number of pgs in creating mode.
2333 """
2334 pgs = self.get_pg_stats()
2335 num = 0
2336 for pg in pgs:
2337 if 'creating' in pg['state']:
2338 num += 1
2339 return num
2340
2341 def get_num_active_clean(self):
2342 """
2343 Find the number of active and clean pgs.
2344 """
2345 pgs = self.get_pg_stats()
2346 return self._get_num_active_clean(pgs)
2347
2348 def _get_num_active_clean(self, pgs):
2349 num = 0
2350 for pg in pgs:
2351 if (pg['state'].count('active') and
2352 pg['state'].count('clean') and
2353 not pg['state'].count('stale')):
2354 num += 1
2355 return num
2356
2357 def get_num_active_recovered(self):
2358 """
2359 Find the number of active and recovered pgs.
2360 """
2361 pgs = self.get_pg_stats()
2362 return self._get_num_active_recovered(pgs)
2363
2364 def _get_num_active_recovered(self, pgs):
2365 num = 0
2366 for pg in pgs:
2367 if (pg['state'].count('active') and
2368 not pg['state'].count('recover') and
2369 not pg['state'].count('backfilling') and
2370 not pg['state'].count('stale')):
2371 num += 1
2372 return num
2373
2374 def get_is_making_recovery_progress(self):
2375 """
2376 Return whether there is recovery progress discernable in the
2377 raw cluster status
2378 """
2379 status = self.raw_cluster_status()
2380 kps = status['pgmap'].get('recovering_keys_per_sec', 0)
2381 bps = status['pgmap'].get('recovering_bytes_per_sec', 0)
2382 ops = status['pgmap'].get('recovering_objects_per_sec', 0)
2383 return kps > 0 or bps > 0 or ops > 0
2384
2385 def get_num_active(self):
2386 """
2387 Find the number of active pgs.
2388 """
2389 pgs = self.get_pg_stats()
2390 return self._get_num_active(pgs)
2391
2392 def _get_num_active(self, pgs):
2393 num = 0
2394 for pg in pgs:
2395 if pg['state'].count('active') and not pg['state'].count('stale'):
2396 num += 1
2397 return num
2398
2399 def get_num_down(self):
2400 """
2401 Find the number of pgs that are down.
2402 """
2403 pgs = self.get_pg_stats()
2404 num = 0
2405 for pg in pgs:
2406 if ((pg['state'].count('down') and not
2407 pg['state'].count('stale')) or
2408 (pg['state'].count('incomplete') and not
2409 pg['state'].count('stale'))):
2410 num += 1
2411 return num
2412
2413 def get_num_active_down(self):
2414 """
2415 Find the number of pgs that are either active or down.
2416 """
2417 pgs = self.get_pg_stats()
2418 return self._get_num_active_down(pgs)
2419
2420 def _get_num_active_down(self, pgs):
2421 num = 0
2422 for pg in pgs:
2423 if ((pg['state'].count('active') and not
2424 pg['state'].count('stale')) or
2425 (pg['state'].count('down') and not
2426 pg['state'].count('stale')) or
2427 (pg['state'].count('incomplete') and not
2428 pg['state'].count('stale'))):
2429 num += 1
2430 return num
2431
2432 def get_num_peered(self):
2433 """
2434 Find the number of PGs that are peered
2435 """
2436 pgs = self.get_pg_stats()
2437 return self._get_num_peered(pgs)
2438
2439 def _get_num_peered(self, pgs):
2440 num = 0
2441 for pg in pgs:
2442 if pg['state'].count('peered') and not pg['state'].count('stale'):
2443 num += 1
2444 return num
2445
2446 def is_clean(self):
2447 """
2448 True if all pgs are clean
2449 """
2450 pgs = self.get_pg_stats()
2451 return self._get_num_active_clean(pgs) == len(pgs)
2452
2453 def is_recovered(self):
2454 """
2455 True if all pgs have recovered
2456 """
2457 pgs = self.get_pg_stats()
2458 return self._get_num_active_recovered(pgs) == len(pgs)
2459
2460 def is_active_or_down(self):
2461 """
2462 True if all pgs are active or down
2463 """
2464 pgs = self.get_pg_stats()
2465 return self._get_num_active_down(pgs) == len(pgs)
2466
2467 def dump_pgs_not_active_clean(self):
2468 """
2469 Dumps all pgs that are not active+clean
2470 """
2471 pgs = self.get_pg_stats()
2472 for pg in pgs:
2473 if pg['state'] != 'active+clean':
2474 self.log('PG %s is not active+clean' % pg['pgid'])
2475 self.log(pg)
2476
2477 def dump_pgs_not_active_down(self):
2478 """
2479 Dumps all pgs that are not active or down
2480 """
2481 pgs = self.get_pg_stats()
2482 for pg in pgs:
2483 if 'active' not in pg['state'] and 'down' not in pg['state']:
2484 self.log('PG %s is not active or down' % pg['pgid'])
2485 self.log(pg)
2486
2487 def dump_pgs_not_active(self):
2488 """
2489 Dumps all pgs that are not active
2490 """
2491 pgs = self.get_pg_stats()
2492 for pg in pgs:
2493 if 'active' not in pg['state']:
2494 self.log('PG %s is not active' % pg['pgid'])
2495 self.log(pg)
2496
2497 def wait_for_clean(self, timeout=1200):
2498 """
2499 Returns true when all pgs are clean.
2500 """
2501 self.log("waiting for clean")
2502 start = time.time()
2503 num_active_clean = self.get_num_active_clean()
2504 while not self.is_clean():
2505 if timeout is not None:
2506 if self.get_is_making_recovery_progress():
2507 self.log("making progress, resetting timeout")
2508 start = time.time()
2509 else:
2510 self.log("no progress seen, keeping timeout for now")
2511 if time.time() - start >= timeout:
2512 self.log('dumping pgs not clean')
2513 self.dump_pgs_not_active_clean()
2514 assert time.time() - start < timeout, \
2515 'wait_for_clean: failed before timeout expired'
2516 cur_active_clean = self.get_num_active_clean()
2517 if cur_active_clean != num_active_clean:
2518 start = time.time()
2519 num_active_clean = cur_active_clean
2520 time.sleep(3)
2521 self.log("clean!")
2522
2523 def are_all_osds_up(self):
2524 """
2525 Returns true if all osds are up.
2526 """
2527 x = self.get_osd_dump()
2528 return (len(x) == sum([(y['up'] > 0) for y in x]))
2529
2530 def wait_for_all_osds_up(self, timeout=None):
2531 """
2532 When this exits, either the timeout has expired, or all
2533 osds are up.
2534 """
2535 self.log("waiting for all up")
2536 start = time.time()
2537 while not self.are_all_osds_up():
2538 if timeout is not None:
2539 assert time.time() - start < timeout, \
2540 'timeout expired in wait_for_all_osds_up'
2541 time.sleep(3)
2542 self.log("all up!")
2543
2544 def pool_exists(self, pool):
2545 if pool in self.list_pools():
2546 return True
2547 return False
2548
2549 def wait_for_pool(self, pool, timeout=300):
2550 """
2551 Wait for a pool to exist
2552 """
2553 self.log('waiting for pool %s to exist' % pool)
2554 start = time.time()
2555 while not self.pool_exists(pool):
2556 if timeout is not None:
2557 assert time.time() - start < timeout, \
2558 'timeout expired in wait_for_pool'
2559 time.sleep(3)
2560
2561 def wait_for_pools(self, pools):
2562 for pool in pools:
2563 self.wait_for_pool(pool)
2564
2565 def is_mgr_available(self):
2566 x = self.get_mgr_dump()
2567 return x.get('available', False)
2568
2569 def wait_for_mgr_available(self, timeout=None):
2570 self.log("waiting for mgr available")
2571 start = time.time()
2572 while not self.is_mgr_available():
2573 if timeout is not None:
2574 assert time.time() - start < timeout, \
2575 'timeout expired in wait_for_mgr_available'
2576 time.sleep(3)
2577 self.log("mgr available!")
2578
2579 def wait_for_recovery(self, timeout=None):
2580 """
2581 Check peering. When this exists, we have recovered.
2582 """
2583 self.log("waiting for recovery to complete")
2584 start = time.time()
2585 num_active_recovered = self.get_num_active_recovered()
2586 while not self.is_recovered():
2587 now = time.time()
2588 if timeout is not None:
2589 if self.get_is_making_recovery_progress():
2590 self.log("making progress, resetting timeout")
2591 start = time.time()
2592 else:
2593 self.log("no progress seen, keeping timeout for now")
2594 if now - start >= timeout:
2595 if self.is_recovered():
2596 break
2597 self.log('dumping pgs not recovered yet')
2598 self.dump_pgs_not_active_clean()
2599 assert now - start < timeout, \
2600 'wait_for_recovery: failed before timeout expired'
2601 cur_active_recovered = self.get_num_active_recovered()
2602 if cur_active_recovered != num_active_recovered:
2603 start = time.time()
2604 num_active_recovered = cur_active_recovered
2605 time.sleep(3)
2606 self.log("recovered!")
2607
2608 def wait_for_active(self, timeout=None):
2609 """
2610 Check peering. When this exists, we are definitely active
2611 """
2612 self.log("waiting for peering to complete")
2613 start = time.time()
2614 num_active = self.get_num_active()
2615 while not self.is_active():
2616 if timeout is not None:
2617 if time.time() - start >= timeout:
2618 self.log('dumping pgs not active')
2619 self.dump_pgs_not_active()
2620 assert time.time() - start < timeout, \
2621 'wait_for_active: failed before timeout expired'
2622 cur_active = self.get_num_active()
2623 if cur_active != num_active:
2624 start = time.time()
2625 num_active = cur_active
2626 time.sleep(3)
2627 self.log("active!")
2628
2629 def wait_for_active_or_down(self, timeout=None):
2630 """
2631 Check peering. When this exists, we are definitely either
2632 active or down
2633 """
2634 self.log("waiting for peering to complete or become blocked")
2635 start = time.time()
2636 num_active_down = self.get_num_active_down()
2637 while not self.is_active_or_down():
2638 if timeout is not None:
2639 if time.time() - start >= timeout:
2640 self.log('dumping pgs not active or down')
2641 self.dump_pgs_not_active_down()
2642 assert time.time() - start < timeout, \
2643 'wait_for_active_or_down: failed before timeout expired'
2644 cur_active_down = self.get_num_active_down()
2645 if cur_active_down != num_active_down:
2646 start = time.time()
2647 num_active_down = cur_active_down
2648 time.sleep(3)
2649 self.log("active or down!")
2650
2651 def osd_is_up(self, osd):
2652 """
2653 Wrapper for osd check
2654 """
2655 osds = self.get_osd_dump()
2656 return osds[osd]['up'] > 0
2657
2658 def wait_till_osd_is_up(self, osd, timeout=None):
2659 """
2660 Loop waiting for osd.
2661 """
2662 self.log('waiting for osd.%d to be up' % osd)
2663 start = time.time()
2664 while not self.osd_is_up(osd):
2665 if timeout is not None:
2666 assert time.time() - start < timeout, \
2667 'osd.%d failed to come up before timeout expired' % osd
2668 time.sleep(3)
2669 self.log('osd.%d is up' % osd)
2670
2671 def is_active(self):
2672 """
2673 Wrapper to check if all pgs are active
2674 """
2675 return self.get_num_active() == self.get_num_pgs()
2676
2677 def all_active_or_peered(self):
2678 """
2679 Wrapper to check if all PGs are active or peered
2680 """
2681 pgs = self.get_pg_stats()
2682 return self._get_num_active(pgs) + self._get_num_peered(pgs) == len(pgs)
2683
2684 def wait_till_active(self, timeout=None):
2685 """
2686 Wait until all pgs are active.
2687 """
2688 self.log("waiting till active")
2689 start = time.time()
2690 while not self.is_active():
2691 if timeout is not None:
2692 if time.time() - start >= timeout:
2693 self.log('dumping pgs not active')
2694 self.dump_pgs_not_active()
2695 assert time.time() - start < timeout, \
2696 'wait_till_active: failed before timeout expired'
2697 time.sleep(3)
2698 self.log("active!")
2699
2700 def wait_till_pg_convergence(self, timeout=None):
2701 start = time.time()
2702 old_stats = None
2703 active_osds = [osd['osd'] for osd in self.get_osd_dump()
2704 if osd['in'] and osd['up']]
2705 while True:
2706 # strictly speaking, no need to wait for mon. but due to the
2707 # "ms inject socket failures" setting, the osdmap could be delayed,
2708 # so mgr is likely to ignore the pg-stat messages with pgs serving
2709 # newly created pools which is not yet known by mgr. so, to make sure
2710 # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
2711 # necessary.
2712 self.flush_pg_stats(active_osds)
2713 new_stats = dict((stat['pgid'], stat['state'])
2714 for stat in self.get_pg_stats())
2715 if old_stats == new_stats:
2716 return old_stats
2717 if timeout is not None:
2718 assert time.time() - start < timeout, \
2719 'failed to reach convergence before %d secs' % timeout
2720 old_stats = new_stats
2721 # longer than mgr_stats_period
2722 time.sleep(5 + 1)
2723
2724 def mark_out_osd(self, osd):
2725 """
2726 Wrapper to mark osd out.
2727 """
2728 self.raw_cluster_cmd('osd', 'out', str(osd))
2729
2730 def kill_osd(self, osd):
2731 """
2732 Kill osds by either power cycling (if indicated by the config)
2733 or by stopping.
2734 """
2735 if self.config.get('powercycle'):
2736 remote = self.find_remote('osd', osd)
2737 self.log('kill_osd on osd.{o} '
2738 'doing powercycle of {s}'.format(o=osd, s=remote.name))
2739 self._assert_ipmi(remote)
2740 remote.console.power_off()
2741 elif self.config.get('bdev_inject_crash') and self.config.get('bdev_inject_crash_probability'):
2742 if random.uniform(0, 1) < self.config.get('bdev_inject_crash_probability', .5):
2743 self.inject_args(
2744 'osd', osd,
2745 'bdev-inject-crash', self.config.get('bdev_inject_crash'))
2746 try:
2747 self.ctx.daemons.get_daemon('osd', osd, self.cluster).wait()
2748 except:
2749 pass
2750 else:
2751 raise RuntimeError('osd.%s did not fail' % osd)
2752 else:
2753 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2754 else:
2755 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2756
2757 @staticmethod
2758 def _assert_ipmi(remote):
2759 assert remote.console.has_ipmi_credentials, (
2760 "powercycling requested but RemoteConsole is not "
2761 "initialized. Check ipmi config.")
2762
2763 def blackhole_kill_osd(self, osd):
2764 """
2765 Stop osd if nothing else works.
2766 """
2767 self.inject_args('osd', osd,
2768 'objectstore-blackhole', True)
2769 time.sleep(2)
2770 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2771
2772 def revive_osd(self, osd, timeout=360, skip_admin_check=False):
2773 """
2774 Revive osds by either power cycling (if indicated by the config)
2775 or by restarting.
2776 """
2777 if self.config.get('powercycle'):
2778 remote = self.find_remote('osd', osd)
2779 self.log('kill_osd on osd.{o} doing powercycle of {s}'.
2780 format(o=osd, s=remote.name))
2781 self._assert_ipmi(remote)
2782 remote.console.power_on()
2783 if not remote.console.check_status(300):
2784 raise Exception('Failed to revive osd.{o} via ipmi'.
2785 format(o=osd))
2786 teuthology.reconnect(self.ctx, 60, [remote])
2787 mount_osd_data(self.ctx, remote, self.cluster, str(osd))
2788 self.make_admin_daemon_dir(remote)
2789 self.ctx.daemons.get_daemon('osd', osd, self.cluster).reset()
2790 self.ctx.daemons.get_daemon('osd', osd, self.cluster).restart()
2791
2792 if not skip_admin_check:
2793 # wait for dump_ops_in_flight; this command doesn't appear
2794 # until after the signal handler is installed and it is safe
2795 # to stop the osd again without making valgrind leak checks
2796 # unhappy. see #5924.
2797 self.wait_run_admin_socket('osd', osd,
2798 args=['dump_ops_in_flight'],
2799 timeout=timeout, stdout=DEVNULL)
2800
2801 def mark_down_osd(self, osd):
2802 """
2803 Cluster command wrapper
2804 """
2805 self.raw_cluster_cmd('osd', 'down', str(osd))
2806
2807 def mark_in_osd(self, osd):
2808 """
2809 Cluster command wrapper
2810 """
2811 self.raw_cluster_cmd('osd', 'in', str(osd))
2812
2813 def signal_osd(self, osd, sig, silent=False):
2814 """
2815 Wrapper to local get_daemon call which sends the given
2816 signal to the given osd.
2817 """
2818 self.ctx.daemons.get_daemon('osd', osd,
2819 self.cluster).signal(sig, silent=silent)
2820
2821 ## monitors
2822 def signal_mon(self, mon, sig, silent=False):
2823 """
2824 Wrapper to local get_daemon call
2825 """
2826 self.ctx.daemons.get_daemon('mon', mon,
2827 self.cluster).signal(sig, silent=silent)
2828
2829 def kill_mon(self, mon):
2830 """
2831 Kill the monitor by either power cycling (if the config says so),
2832 or by doing a stop.
2833 """
2834 if self.config.get('powercycle'):
2835 remote = self.find_remote('mon', mon)
2836 self.log('kill_mon on mon.{m} doing powercycle of {s}'.
2837 format(m=mon, s=remote.name))
2838 self._assert_ipmi(remote)
2839 remote.console.power_off()
2840 else:
2841 self.ctx.daemons.get_daemon('mon', mon, self.cluster).stop()
2842
2843 def revive_mon(self, mon):
2844 """
2845 Restart by either power cycling (if the config says so),
2846 or by doing a normal restart.
2847 """
2848 if self.config.get('powercycle'):
2849 remote = self.find_remote('mon', mon)
2850 self.log('revive_mon on mon.{m} doing powercycle of {s}'.
2851 format(m=mon, s=remote.name))
2852 self._assert_ipmi(remote)
2853 remote.console.power_on()
2854 self.make_admin_daemon_dir(remote)
2855 self.ctx.daemons.get_daemon('mon', mon, self.cluster).restart()
2856
2857 def revive_mgr(self, mgr):
2858 """
2859 Restart by either power cycling (if the config says so),
2860 or by doing a normal restart.
2861 """
2862 if self.config.get('powercycle'):
2863 remote = self.find_remote('mgr', mgr)
2864 self.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
2865 format(m=mgr, s=remote.name))
2866 self._assert_ipmi(remote)
2867 remote.console.power_on()
2868 self.make_admin_daemon_dir(remote)
2869 self.ctx.daemons.get_daemon('mgr', mgr, self.cluster).restart()
2870
2871 def get_mon_status(self, mon):
2872 """
2873 Extract all the monitor status information from the cluster
2874 """
2875 out = self.raw_cluster_cmd('tell', 'mon.%s' % mon, 'mon_status')
2876 return json.loads(out)
2877
2878 def get_mon_quorum(self):
2879 """
2880 Extract monitor quorum information from the cluster
2881 """
2882 out = self.raw_cluster_cmd('quorum_status')
2883 j = json.loads(out)
2884 self.log('quorum_status is %s' % out)
2885 return j['quorum']
2886
2887 def wait_for_mon_quorum_size(self, size, timeout=300):
2888 """
2889 Loop until quorum size is reached.
2890 """
2891 self.log('waiting for quorum size %d' % size)
2892 start = time.time()
2893 while not len(self.get_mon_quorum()) == size:
2894 if timeout is not None:
2895 assert time.time() - start < timeout, \
2896 ('failed to reach quorum size %d '
2897 'before timeout expired' % size)
2898 time.sleep(3)
2899 self.log("quorum is size %d" % size)
2900
2901 def get_mon_health(self, debug=False):
2902 """
2903 Extract all the monitor health information.
2904 """
2905 out = self.raw_cluster_cmd('health', '--format=json')
2906 if debug:
2907 self.log('health:\n{h}'.format(h=out))
2908 return json.loads(out)
2909
2910 def wait_until_healthy(self, timeout=None):
2911 self.log("wait_until_healthy")
2912 start = time.time()
2913 while self.get_mon_health()['status'] != 'HEALTH_OK':
2914 if timeout is not None:
2915 assert time.time() - start < timeout, \
2916 'timeout expired in wait_until_healthy'
2917 time.sleep(3)
2918 self.log("wait_until_healthy done")
2919
2920 def get_filepath(self):
2921 """
2922 Return path to osd data with {id} needing to be replaced
2923 """
2924 return '/var/lib/ceph/osd/' + self.cluster + '-{id}'
2925
2926 def make_admin_daemon_dir(self, remote):
2927 """
2928 Create /var/run/ceph directory on remote site.
2929
2930 :param ctx: Context
2931 :param remote: Remote site
2932 """
2933 remote.run(args=['sudo',
2934 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
2935
2936 def get_service_task_status(self, service, status_key):
2937 """
2938 Return daemon task status for a given ceph service.
2939
2940 :param service: ceph service (mds, osd, etc...)
2941 :param status_key: matching task status key
2942 """
2943 task_status = {}
2944 status = self.raw_cluster_status()
2945 try:
2946 for k,v in status['servicemap']['services'][service]['daemons'].items():
2947 ts = dict(v).get('task_status', None)
2948 if ts:
2949 task_status[k] = ts[status_key]
2950 except KeyError: # catches missing service and status key
2951 return {}
2952 self.log(task_status)
2953 return task_status
2954
2955 def utility_task(name):
2956 """
2957 Generate ceph_manager subtask corresponding to ceph_manager
2958 method name
2959 """
2960 def task(ctx, config):
2961 if config is None:
2962 config = {}
2963 args = config.get('args', [])
2964 kwargs = config.get('kwargs', {})
2965 cluster = config.get('cluster', 'ceph')
2966 fn = getattr(ctx.managers[cluster], name)
2967 fn(*args, **kwargs)
2968 return task
2969
2970 revive_osd = utility_task("revive_osd")
2971 revive_mon = utility_task("revive_mon")
2972 kill_osd = utility_task("kill_osd")
2973 kill_mon = utility_task("kill_mon")
2974 create_pool = utility_task("create_pool")
2975 remove_pool = utility_task("remove_pool")
2976 wait_for_clean = utility_task("wait_for_clean")
2977 flush_all_pg_stats = utility_task("flush_all_pg_stats")
2978 set_pool_property = utility_task("set_pool_property")
2979 do_pg_scrub = utility_task("do_pg_scrub")
2980 wait_for_pool = utility_task("wait_for_pool")
2981 wait_for_pools = utility_task("wait_for_pools")