]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/ceph_manager.py
import 15.2.5
[ceph.git] / ceph / qa / tasks / ceph_manager.py
CommitLineData
7c673cae
FG
1"""
2ceph manager -- Thrasher and CephManager objects
3"""
7c673cae
FG
4from functools import wraps
5import contextlib
6import random
7import signal
8import time
9import gevent
10import base64
11import json
12import logging
13import threading
14import traceback
15import os
9f95a23c
TL
16import six
17
18from io import BytesIO
e306af50 19from six import StringIO
7c673cae
FG
20from teuthology import misc as teuthology
21from tasks.scrub import Scrubber
9f95a23c
TL
22from tasks.util.rados import cmd_erasure_code_profile
23from tasks.util import get_remote
7c673cae
FG
24from teuthology.contextutil import safe_while
25from teuthology.orchestra.remote import Remote
26from teuthology.orchestra import run
27from teuthology.exceptions import CommandFailedError
9f95a23c
TL
28from tasks.thrasher import Thrasher
29from six import StringIO
7c673cae
FG
30
31try:
32 from subprocess import DEVNULL # py3k
33except ImportError:
9f95a23c 34 DEVNULL = open(os.devnull, 'r+') # type: ignore
7c673cae
FG
35
36DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
37
38log = logging.getLogger(__name__)
39
9f95a23c
TL
40# this is for cephadm clusters
41def shell(ctx, cluster_name, remote, args, name=None, **kwargs):
42 testdir = teuthology.get_testdir(ctx)
43 extra_args = []
44 if name:
45 extra_args = ['-n', name]
46 return remote.run(
47 args=[
48 'sudo',
49 ctx.cephadm,
50 '--image', ctx.ceph[cluster_name].image,
51 'shell',
52 ] + extra_args + [
53 '--fsid', ctx.ceph[cluster_name].fsid,
54 '--',
55 ] + args,
56 **kwargs
57 )
7c673cae
FG
58
59def write_conf(ctx, conf_path=DEFAULT_CONF_PATH, cluster='ceph'):
9f95a23c 60 conf_fp = BytesIO()
7c673cae
FG
61 ctx.ceph[cluster].conf.write(conf_fp)
62 conf_fp.seek(0)
63 writes = ctx.cluster.run(
64 args=[
65 'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
66 'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
9f95a23c 67 'sudo', 'tee', conf_path, run.Raw('&&'),
7c673cae 68 'sudo', 'chmod', '0644', conf_path,
9f95a23c
TL
69 run.Raw('>'), '/dev/null',
70
7c673cae
FG
71 ],
72 stdin=run.PIPE,
73 wait=False)
74 teuthology.feed_many_stdins_and_close(conf_fp, writes)
75 run.wait(writes)
76
77
78def mount_osd_data(ctx, remote, cluster, osd):
79 """
80 Mount a remote OSD
81
82 :param ctx: Context
83 :param remote: Remote site
84 :param cluster: name of ceph cluster
85 :param osd: Osd name
86 """
87 log.debug('Mounting data for osd.{o} on {r}'.format(o=osd, r=remote))
88 role = "{0}.osd.{1}".format(cluster, osd)
89 alt_role = role if cluster != 'ceph' else "osd.{0}".format(osd)
90 if remote in ctx.disk_config.remote_to_roles_to_dev:
91 if alt_role in ctx.disk_config.remote_to_roles_to_dev[remote]:
92 role = alt_role
93 if role not in ctx.disk_config.remote_to_roles_to_dev[remote]:
94 return
95 dev = ctx.disk_config.remote_to_roles_to_dev[remote][role]
96 mount_options = ctx.disk_config.\
97 remote_to_roles_to_dev_mount_options[remote][role]
98 fstype = ctx.disk_config.remote_to_roles_to_dev_fstype[remote][role]
99 mnt = os.path.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster, osd))
100
101 log.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
102 'mountpoint: {p}, type: {t}, options: {v}'.format(
103 o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options,
104 c=cluster))
105
106 remote.run(
107 args=[
108 'sudo',
109 'mount',
110 '-t', fstype,
111 '-o', ','.join(mount_options),
112 dev,
113 mnt,
114 ]
115 )
116
117
9f95a23c
TL
118def log_exc(func):
119 @wraps(func)
120 def wrapper(self):
121 try:
122 return func(self)
123 except:
124 self.log(traceback.format_exc())
125 raise
126 return wrapper
127
128
129class PoolType:
130 REPLICATED = 1
131 ERASURE_CODED = 3
132
133
134class OSDThrasher(Thrasher):
7c673cae
FG
135 """
136 Object used to thrash Ceph
137 """
9f95a23c
TL
138 def __init__(self, manager, config, name, logger):
139 super(OSDThrasher, self).__init__()
140
7c673cae
FG
141 self.ceph_manager = manager
142 self.cluster = manager.cluster
143 self.ceph_manager.wait_for_clean()
144 osd_status = self.ceph_manager.get_osd_status()
145 self.in_osds = osd_status['in']
146 self.live_osds = osd_status['live']
147 self.out_osds = osd_status['out']
148 self.dead_osds = osd_status['dead']
149 self.stopping = False
150 self.logger = logger
151 self.config = config
9f95a23c 152 self.name = name
3efd9988 153 self.revive_timeout = self.config.get("revive_timeout", 360)
7c673cae
FG
154 self.pools_to_fix_pgp_num = set()
155 if self.config.get('powercycle'):
156 self.revive_timeout += 120
157 self.clean_wait = self.config.get('clean_wait', 0)
3efd9988 158 self.minin = self.config.get("min_in", 4)
7c673cae
FG
159 self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
160 self.sighup_delay = self.config.get('sighup_delay')
161 self.optrack_toggle_delay = self.config.get('optrack_toggle_delay')
162 self.dump_ops_enable = self.config.get('dump_ops_enable')
163 self.noscrub_toggle_delay = self.config.get('noscrub_toggle_delay')
164 self.chance_thrash_cluster_full = self.config.get('chance_thrash_cluster_full', .05)
165 self.chance_thrash_pg_upmap = self.config.get('chance_thrash_pg_upmap', 1.0)
166 self.chance_thrash_pg_upmap_items = self.config.get('chance_thrash_pg_upmap', 1.0)
224ce89b 167 self.random_eio = self.config.get('random_eio')
c07f9fc5 168 self.chance_force_recovery = self.config.get('chance_force_recovery', 0.3)
7c673cae
FG
169
170 num_osds = self.in_osds + self.out_osds
11fdf7f2
TL
171 self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * len(num_osds)
172 self.min_pgs = self.config.get("min_pgs_per_pool_osd", 1) * len(num_osds)
7c673cae
FG
173 if self.config is None:
174 self.config = dict()
175 # prevent monitor from auto-marking things out while thrasher runs
176 # try both old and new tell syntax, in case we are testing old code
177 self.saved_options = []
178 # assuming that the default settings do not vary from one daemon to
179 # another
180 first_mon = teuthology.get_first_mon(manager.ctx, self.config).split('.')
181 opts = [('mon', 'mon_osd_down_out_interval', 0)]
9f95a23c 182 #why do we disable marking an OSD out automatically? :/
7c673cae
FG
183 for service, opt, new_value in opts:
184 old_value = manager.get_config(first_mon[0],
185 first_mon[1],
186 opt)
187 self.saved_options.append((service, opt, old_value))
11fdf7f2 188 manager.inject_args(service, '*', opt, new_value)
7c673cae
FG
189 # initialize ceph_objectstore_tool property - must be done before
190 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
191 if (self.config.get('powercycle') or
192 not self.cmd_exists_on_osds("ceph-objectstore-tool") or
193 self.config.get('disable_objectstore_tool_tests', False)):
194 self.ceph_objectstore_tool = False
7c673cae
FG
195 if self.config.get('powercycle'):
196 self.log("Unable to test ceph-objectstore-tool, "
197 "powercycle testing")
198 else:
199 self.log("Unable to test ceph-objectstore-tool, "
200 "not available on all OSD nodes")
201 else:
202 self.ceph_objectstore_tool = \
203 self.config.get('ceph_objectstore_tool', True)
7c673cae
FG
204 # spawn do_thrash
205 self.thread = gevent.spawn(self.do_thrash)
206 if self.sighup_delay:
207 self.sighup_thread = gevent.spawn(self.do_sighup)
208 if self.optrack_toggle_delay:
209 self.optrack_toggle_thread = gevent.spawn(self.do_optrack_toggle)
210 if self.dump_ops_enable == "true":
211 self.dump_ops_thread = gevent.spawn(self.do_dump_ops)
212 if self.noscrub_toggle_delay:
213 self.noscrub_toggle_thread = gevent.spawn(self.do_noscrub_toggle)
214
9f95a23c
TL
215 def log(self, msg, *args, **kwargs):
216 self.logger.info(msg, *args, **kwargs)
217
7c673cae 218 def cmd_exists_on_osds(self, cmd):
9f95a23c
TL
219 if self.ceph_manager.cephadm:
220 return True
7c673cae
FG
221 allremotes = self.ceph_manager.ctx.cluster.only(\
222 teuthology.is_type('osd', self.cluster)).remotes.keys()
223 allremotes = list(set(allremotes))
224 for remote in allremotes:
225 proc = remote.run(args=['type', cmd], wait=True,
9f95a23c
TL
226 check_status=False, stdout=BytesIO(),
227 stderr=BytesIO())
7c673cae
FG
228 if proc.exitstatus != 0:
229 return False;
230 return True;
231
9f95a23c
TL
232 def run_ceph_objectstore_tool(self, remote, osd, cmd):
233 if self.ceph_manager.cephadm:
234 return shell(
235 self.ceph_manager.ctx, self.ceph_manager.cluster, remote,
236 args=['ceph-objectstore-tool'] + cmd,
237 name=osd,
238 wait=True, check_status=False,
239 stdout=StringIO(),
240 stderr=StringIO())
241 else:
242 return remote.run(
243 args=['sudo', 'adjust-ulimits', 'ceph-objectstore-tool'] + cmd,
244 wait=True, check_status=False,
245 stdout=StringIO(),
246 stderr=StringIO())
247
7c673cae
FG
248 def kill_osd(self, osd=None, mark_down=False, mark_out=False):
249 """
250 :param osd: Osd to be killed.
251 :mark_down: Mark down if true.
252 :mark_out: Mark out if true.
253 """
254 if osd is None:
255 osd = random.choice(self.live_osds)
256 self.log("Killing osd %s, live_osds are %s" % (str(osd),
257 str(self.live_osds)))
258 self.live_osds.remove(osd)
259 self.dead_osds.append(osd)
260 self.ceph_manager.kill_osd(osd)
261 if mark_down:
262 self.ceph_manager.mark_down_osd(osd)
263 if mark_out and osd in self.in_osds:
264 self.out_osd(osd)
265 if self.ceph_objectstore_tool:
9f95a23c 266 self.log("Testing ceph-objectstore-tool on down osd.%s" % osd)
7c673cae
FG
267 remote = self.ceph_manager.find_remote('osd', osd)
268 FSPATH = self.ceph_manager.get_filepath()
269 JPATH = os.path.join(FSPATH, "journal")
270 exp_osd = imp_osd = osd
9f95a23c 271 self.log('remote for osd %s is %s' % (osd, remote))
7c673cae
FG
272 exp_remote = imp_remote = remote
273 # If an older osd is available we'll move a pg from there
274 if (len(self.dead_osds) > 1 and
275 random.random() < self.chance_move_pg):
276 exp_osd = random.choice(self.dead_osds[:-1])
277 exp_remote = self.ceph_manager.find_remote('osd', exp_osd)
9f95a23c
TL
278 self.log('remote for exp osd %s is %s' % (exp_osd, exp_remote))
279 prefix = [
280 '--no-mon-config',
281 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
282 ]
283
284 if not self.ceph_manager.cephadm:
285 # ceph-objectstore-tool might be temporarily absent during an
286 # upgrade - see http://tracker.ceph.com/issues/18014
287 with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
288 while proceed():
289 proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'],
290 wait=True, check_status=False, stdout=BytesIO(),
291 stderr=BytesIO())
292 if proc.exitstatus == 0:
293 break
294 log.debug("ceph-objectstore-tool binary not present, trying again")
7c673cae
FG
295
296 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
297 # see http://tracker.ceph.com/issues/19556
298 with safe_while(sleep=15, tries=40, action="ceph-objectstore-tool --op list-pgs") as proceed:
299 while proceed():
9f95a23c
TL
300 proc = self.run_ceph_objectstore_tool(
301 exp_remote, 'osd.%s' % exp_osd,
302 prefix + [
303 '--data-path', FSPATH.format(id=exp_osd),
304 '--journal-path', JPATH.format(id=exp_osd),
305 '--op', 'list-pgs',
306 ])
7c673cae
FG
307 if proc.exitstatus == 0:
308 break
9f95a23c
TL
309 elif (proc.exitstatus == 1 and
310 proc.stderr.getvalue() == "OSD has the store locked"):
7c673cae
FG
311 continue
312 else:
313 raise Exception("ceph-objectstore-tool: "
314 "exp list-pgs failure with status {ret}".
315 format(ret=proc.exitstatus))
316
9f95a23c 317 pgs = six.ensure_str(proc.stdout.getvalue()).split('\n')[:-1]
7c673cae
FG
318 if len(pgs) == 0:
319 self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
320 return
321 pg = random.choice(pgs)
9f95a23c
TL
322 #exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
323 #exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
324 exp_path = os.path.join('/var/log/ceph', # available inside 'shell' container
7c673cae
FG
325 "exp.{pg}.{id}".format(
326 pg=pg,
327 id=exp_osd))
9f95a23c
TL
328 if self.ceph_manager.cephadm:
329 exp_host_path = os.path.join(
330 '/var/log/ceph',
331 self.ceph_manager.ctx.ceph[self.ceph_manager.cluster].fsid,
332 "exp.{pg}.{id}".format(
333 pg=pg,
334 id=exp_osd))
335 else:
336 exp_host_path = exp_path
337
7c673cae 338 # export
3efd9988 339 # Can't use new export-remove op since this is part of upgrade testing
9f95a23c
TL
340 proc = self.run_ceph_objectstore_tool(
341 exp_remote, 'osd.%s' % exp_osd,
342 prefix + [
343 '--data-path', FSPATH.format(id=exp_osd),
344 '--journal-path', JPATH.format(id=exp_osd),
345 '--op', 'export',
346 '--pgid', pg,
347 '--file', exp_path,
348 ])
7c673cae
FG
349 if proc.exitstatus:
350 raise Exception("ceph-objectstore-tool: "
351 "export failure with status {ret}".
352 format(ret=proc.exitstatus))
353 # remove
9f95a23c
TL
354 proc = self.run_ceph_objectstore_tool(
355 exp_remote, 'osd.%s' % exp_osd,
356 prefix + [
357 '--data-path', FSPATH.format(id=exp_osd),
358 '--journal-path', JPATH.format(id=exp_osd),
359 '--force',
360 '--op', 'remove',
361 '--pgid', pg,
362 ])
7c673cae
FG
363 if proc.exitstatus:
364 raise Exception("ceph-objectstore-tool: "
365 "remove failure with status {ret}".
366 format(ret=proc.exitstatus))
367 # If there are at least 2 dead osds we might move the pg
368 if exp_osd != imp_osd:
369 # If pg isn't already on this osd, then we will move it there
9f95a23c
TL
370 proc = self.run_ceph_objectstore_tool(
371 imp_remote,
372 'osd.%s' % imp_osd,
373 prefix + [
374 '--data-path', FSPATH.format(id=imp_osd),
375 '--journal-path', JPATH.format(id=imp_osd),
376 '--op', 'list-pgs',
377 ])
7c673cae
FG
378 if proc.exitstatus:
379 raise Exception("ceph-objectstore-tool: "
380 "imp list-pgs failure with status {ret}".
381 format(ret=proc.exitstatus))
9f95a23c 382 pgs = six.ensure_str(proc.stdout.getvalue()).split('\n')[:-1]
7c673cae
FG
383 if pg not in pgs:
384 self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
385 format(pg=pg, fosd=exp_osd, tosd=imp_osd))
386 if imp_remote != exp_remote:
387 # Copy export file to the other machine
388 self.log("Transfer export file from {srem} to {trem}".
389 format(srem=exp_remote, trem=imp_remote))
9f95a23c
TL
390 # just in case an upgrade make /var/log/ceph unreadable by non-root,
391 exp_remote.run(args=['sudo', 'chmod', '777',
392 '/var/log/ceph'])
393 imp_remote.run(args=['sudo', 'chmod', '777',
394 '/var/log/ceph'])
395 tmpexport = Remote.get_file(exp_remote, exp_host_path,
396 sudo=True)
397 if exp_host_path != exp_path:
398 # push to /var/log/ceph, then rename (we can't
399 # chmod 777 the /var/log/ceph/$fsid mountpoint)
400 Remote.put_file(imp_remote, tmpexport, exp_path)
401 imp_remote.run(args=[
402 'sudo', 'mv', exp_path, exp_host_path])
403 else:
404 Remote.put_file(imp_remote, tmpexport, exp_host_path)
7c673cae
FG
405 os.remove(tmpexport)
406 else:
407 # Can't move the pg after all
408 imp_osd = exp_osd
409 imp_remote = exp_remote
410 # import
9f95a23c
TL
411 proc = self.run_ceph_objectstore_tool(
412 imp_remote, 'osd.%s' % imp_osd,
413 [
414 '--data-path', FSPATH.format(id=imp_osd),
415 '--journal-path', JPATH.format(id=imp_osd),
416 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
417 '--op', 'import',
418 '--file', exp_path,
419 ])
7c673cae
FG
420 if proc.exitstatus == 1:
421 bogosity = "The OSD you are using is older than the exported PG"
422 if bogosity in proc.stderr.getvalue():
423 self.log("OSD older than exported PG"
424 "...ignored")
425 elif proc.exitstatus == 10:
426 self.log("Pool went away before processing an import"
427 "...ignored")
428 elif proc.exitstatus == 11:
429 self.log("Attempt to import an incompatible export"
430 "...ignored")
11fdf7f2
TL
431 elif proc.exitstatus == 12:
432 # this should be safe to ignore because we only ever move 1
433 # copy of the pg at a time, and merge is only initiated when
434 # all replicas are peered and happy. /me crosses fingers
435 self.log("PG merged on target"
436 "...ignored")
7c673cae
FG
437 elif proc.exitstatus:
438 raise Exception("ceph-objectstore-tool: "
439 "import failure with status {ret}".
440 format(ret=proc.exitstatus))
9f95a23c 441 cmd = "sudo rm -f {file}".format(file=exp_host_path)
7c673cae
FG
442 exp_remote.run(args=cmd)
443 if imp_remote != exp_remote:
444 imp_remote.run(args=cmd)
445
446 # apply low split settings to each pool
9f95a23c
TL
447 if not self.ceph_manager.cephadm:
448 for pool in self.ceph_manager.list_pools():
449 cmd = ("CEPH_ARGS='--filestore-merge-threshold 1 "
450 "--filestore-split-multiple 1' sudo -E "
451 + 'ceph-objectstore-tool '
452 + ' '.join(prefix + [
453 '--data-path', FSPATH.format(id=imp_osd),
454 '--journal-path', JPATH.format(id=imp_osd),
455 ])
456 + " --op apply-layout-settings --pool " + pool).format(id=osd)
457 proc = imp_remote.run(args=cmd,
458 wait=True, check_status=False,
459 stderr=StringIO())
460 if 'Couldn\'t find pool' in proc.stderr.getvalue():
461 continue
462 if proc.exitstatus:
463 raise Exception("ceph-objectstore-tool apply-layout-settings"
464 " failed with {status}".format(status=proc.exitstatus))
7c673cae 465
7c673cae
FG
466
467 def blackhole_kill_osd(self, osd=None):
468 """
469 If all else fails, kill the osd.
470 :param osd: Osd to be killed.
471 """
472 if osd is None:
473 osd = random.choice(self.live_osds)
474 self.log("Blackholing and then killing osd %s, live_osds are %s" %
475 (str(osd), str(self.live_osds)))
476 self.live_osds.remove(osd)
477 self.dead_osds.append(osd)
478 self.ceph_manager.blackhole_kill_osd(osd)
479
480 def revive_osd(self, osd=None, skip_admin_check=False):
481 """
482 Revive the osd.
483 :param osd: Osd to be revived.
484 """
485 if osd is None:
486 osd = random.choice(self.dead_osds)
487 self.log("Reviving osd %s" % (str(osd),))
488 self.ceph_manager.revive_osd(
489 osd,
490 self.revive_timeout,
491 skip_admin_check=skip_admin_check)
492 self.dead_osds.remove(osd)
493 self.live_osds.append(osd)
11fdf7f2
TL
494 if self.random_eio > 0 and osd == self.rerrosd:
495 self.ceph_manager.set_config(self.rerrosd,
496 filestore_debug_random_read_err = self.random_eio)
497 self.ceph_manager.set_config(self.rerrosd,
498 bluestore_debug_random_read_err = self.random_eio)
224ce89b 499
7c673cae
FG
500
501 def out_osd(self, osd=None):
502 """
503 Mark the osd out
504 :param osd: Osd to be marked.
505 """
506 if osd is None:
507 osd = random.choice(self.in_osds)
508 self.log("Removing osd %s, in_osds are: %s" %
509 (str(osd), str(self.in_osds)))
510 self.ceph_manager.mark_out_osd(osd)
511 self.in_osds.remove(osd)
512 self.out_osds.append(osd)
513
514 def in_osd(self, osd=None):
515 """
516 Mark the osd out
517 :param osd: Osd to be marked.
518 """
519 if osd is None:
520 osd = random.choice(self.out_osds)
521 if osd in self.dead_osds:
522 return self.revive_osd(osd)
523 self.log("Adding osd %s" % (str(osd),))
524 self.out_osds.remove(osd)
525 self.in_osds.append(osd)
526 self.ceph_manager.mark_in_osd(osd)
527 self.log("Added osd %s" % (str(osd),))
528
529 def reweight_osd_or_by_util(self, osd=None):
530 """
531 Reweight an osd that is in
532 :param osd: Osd to be marked.
533 """
534 if osd is not None or random.choice([True, False]):
535 if osd is None:
536 osd = random.choice(self.in_osds)
537 val = random.uniform(.1, 1.0)
538 self.log("Reweighting osd %s to %s" % (str(osd), str(val)))
539 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
540 str(osd), str(val))
541 else:
542 # do it several times, the option space is large
543 for i in range(5):
544 options = {
545 'max_change': random.choice(['0.05', '1.0', '3.0']),
546 'overage': random.choice(['110', '1000']),
547 'type': random.choice([
548 'reweight-by-utilization',
549 'test-reweight-by-utilization']),
550 }
551 self.log("Reweighting by: %s"%(str(options),))
552 self.ceph_manager.raw_cluster_cmd(
553 'osd',
554 options['type'],
555 options['overage'],
556 options['max_change'])
557
558 def primary_affinity(self, osd=None):
559 if osd is None:
560 osd = random.choice(self.in_osds)
561 if random.random() >= .5:
562 pa = random.random()
563 elif random.random() >= .5:
564 pa = 1
565 else:
566 pa = 0
567 self.log('Setting osd %s primary_affinity to %f' % (str(osd), pa))
568 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
569 str(osd), str(pa))
570
571 def thrash_cluster_full(self):
572 """
573 Set and unset cluster full condition
574 """
575 self.log('Setting full ratio to .001')
576 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
577 time.sleep(1)
578 self.log('Setting full ratio back to .95')
579 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
580
581 def thrash_pg_upmap(self):
582 """
583 Install or remove random pg_upmap entries in OSDMap
584 """
585 from random import shuffle
586 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
587 j = json.loads(out)
588 self.log('j is %s' % j)
589 try:
590 if random.random() >= .3:
591 pgs = self.ceph_manager.get_pg_stats()
9f95a23c
TL
592 if not pgs:
593 return
7c673cae
FG
594 pg = random.choice(pgs)
595 pgid = str(pg['pgid'])
596 poolid = int(pgid.split('.')[0])
597 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
598 if len(sizes) == 0:
599 return
600 n = sizes[0]
601 osds = self.in_osds + self.out_osds
602 shuffle(osds)
603 osds = osds[0:n]
604 self.log('Setting %s to %s' % (pgid, osds))
605 cmd = ['osd', 'pg-upmap', pgid] + [str(x) for x in osds]
606 self.log('cmd %s' % cmd)
607 self.ceph_manager.raw_cluster_cmd(*cmd)
608 else:
609 m = j['pg_upmap']
610 if len(m) > 0:
611 shuffle(m)
612 pg = m[0]['pgid']
613 self.log('Clearing pg_upmap on %s' % pg)
614 self.ceph_manager.raw_cluster_cmd(
615 'osd',
616 'rm-pg-upmap',
617 pg)
618 else:
619 self.log('No pg_upmap entries; doing nothing')
620 except CommandFailedError:
621 self.log('Failed to rm-pg-upmap, ignoring')
622
623 def thrash_pg_upmap_items(self):
624 """
625 Install or remove random pg_upmap_items entries in OSDMap
626 """
627 from random import shuffle
628 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
629 j = json.loads(out)
630 self.log('j is %s' % j)
631 try:
632 if random.random() >= .3:
633 pgs = self.ceph_manager.get_pg_stats()
9f95a23c
TL
634 if not pgs:
635 return
7c673cae
FG
636 pg = random.choice(pgs)
637 pgid = str(pg['pgid'])
638 poolid = int(pgid.split('.')[0])
639 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
640 if len(sizes) == 0:
641 return
642 n = sizes[0]
643 osds = self.in_osds + self.out_osds
644 shuffle(osds)
645 osds = osds[0:n*2]
646 self.log('Setting %s to %s' % (pgid, osds))
647 cmd = ['osd', 'pg-upmap-items', pgid] + [str(x) for x in osds]
648 self.log('cmd %s' % cmd)
649 self.ceph_manager.raw_cluster_cmd(*cmd)
650 else:
651 m = j['pg_upmap_items']
652 if len(m) > 0:
653 shuffle(m)
654 pg = m[0]['pgid']
655 self.log('Clearing pg_upmap on %s' % pg)
656 self.ceph_manager.raw_cluster_cmd(
657 'osd',
658 'rm-pg-upmap-items',
659 pg)
660 else:
661 self.log('No pg_upmap entries; doing nothing')
662 except CommandFailedError:
663 self.log('Failed to rm-pg-upmap-items, ignoring')
664
c07f9fc5
FG
665 def force_recovery(self):
666 """
667 Force recovery on some of PGs
668 """
669 backfill = random.random() >= 0.5
670 j = self.ceph_manager.get_pgids_to_force(backfill)
671 if j:
b32b8144
FG
672 try:
673 if backfill:
674 self.ceph_manager.raw_cluster_cmd('pg', 'force-backfill', *j)
675 else:
676 self.ceph_manager.raw_cluster_cmd('pg', 'force-recovery', *j)
677 except CommandFailedError:
678 self.log('Failed to force backfill|recovery, ignoring')
679
c07f9fc5
FG
680
681 def cancel_force_recovery(self):
682 """
683 Force recovery on some of PGs
684 """
685 backfill = random.random() >= 0.5
686 j = self.ceph_manager.get_pgids_to_cancel_force(backfill)
687 if j:
b32b8144
FG
688 try:
689 if backfill:
690 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-backfill', *j)
691 else:
692 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-recovery', *j)
693 except CommandFailedError:
694 self.log('Failed to force backfill|recovery, ignoring')
c07f9fc5
FG
695
696 def force_cancel_recovery(self):
697 """
698 Force or cancel forcing recovery
699 """
700 if random.random() >= 0.4:
701 self.force_recovery()
702 else:
703 self.cancel_force_recovery()
704
7c673cae
FG
705 def all_up(self):
706 """
707 Make sure all osds are up and not out.
708 """
709 while len(self.dead_osds) > 0:
710 self.log("reviving osd")
711 self.revive_osd()
712 while len(self.out_osds) > 0:
713 self.log("inning osd")
714 self.in_osd()
715
31f18b77
FG
716 def all_up_in(self):
717 """
718 Make sure all osds are up and fully in.
719 """
720 self.all_up();
721 for osd in self.live_osds:
722 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
723 str(osd), str(1))
724 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
725 str(osd), str(1))
726
7c673cae
FG
727 def do_join(self):
728 """
729 Break out of this Ceph loop
730 """
731 self.stopping = True
732 self.thread.get()
733 if self.sighup_delay:
734 self.log("joining the do_sighup greenlet")
735 self.sighup_thread.get()
736 if self.optrack_toggle_delay:
737 self.log("joining the do_optrack_toggle greenlet")
738 self.optrack_toggle_thread.join()
739 if self.dump_ops_enable == "true":
740 self.log("joining the do_dump_ops greenlet")
741 self.dump_ops_thread.join()
742 if self.noscrub_toggle_delay:
743 self.log("joining the do_noscrub_toggle greenlet")
744 self.noscrub_toggle_thread.join()
745
746 def grow_pool(self):
747 """
748 Increase the size of the pool
749 """
750 pool = self.ceph_manager.get_pool()
9f95a23c
TL
751 if pool is None:
752 return
7c673cae
FG
753 self.log("Growing pool %s" % (pool,))
754 if self.ceph_manager.expand_pool(pool,
755 self.config.get('pool_grow_by', 10),
756 self.max_pgs):
757 self.pools_to_fix_pgp_num.add(pool)
758
11fdf7f2
TL
759 def shrink_pool(self):
760 """
761 Decrease the size of the pool
762 """
763 pool = self.ceph_manager.get_pool()
9f95a23c
TL
764 if pool is None:
765 return
766 _ = self.ceph_manager.get_pool_pg_num(pool)
11fdf7f2
TL
767 self.log("Shrinking pool %s" % (pool,))
768 if self.ceph_manager.contract_pool(
769 pool,
770 self.config.get('pool_shrink_by', 10),
771 self.min_pgs):
772 self.pools_to_fix_pgp_num.add(pool)
773
7c673cae
FG
774 def fix_pgp_num(self, pool=None):
775 """
776 Fix number of pgs in pool.
777 """
778 if pool is None:
779 pool = self.ceph_manager.get_pool()
9f95a23c
TL
780 if not pool:
781 return
7c673cae
FG
782 force = False
783 else:
784 force = True
785 self.log("fixing pg num pool %s" % (pool,))
786 if self.ceph_manager.set_pool_pgpnum(pool, force):
787 self.pools_to_fix_pgp_num.discard(pool)
788
789 def test_pool_min_size(self):
790 """
9f95a23c
TL
791 Loop to selectively push PGs below their min_size and test that recovery
792 still occurs.
7c673cae
FG
793 """
794 self.log("test_pool_min_size")
795 self.all_up()
796 self.ceph_manager.wait_for_recovery(
797 timeout=self.config.get('timeout')
798 )
9f95a23c
TL
799
800 minout = int(self.config.get("min_out", 1))
801 minlive = int(self.config.get("min_live", 2))
802 mindead = int(self.config.get("min_dead", 1))
803 self.log("doing min_size thrashing")
804 self.ceph_manager.wait_for_clean(timeout=60)
805 assert self.ceph_manager.is_clean(), \
806 'not clean before minsize thrashing starts'
807 while not self.stopping:
808 # look up k and m from all the pools on each loop, in case it
809 # changes as the cluster runs
810 k = 0
811 m = 99
812 has_pools = False
813 pools_json = self.ceph_manager.get_osd_dump_json()['pools']
814
815 for pool_json in pools_json:
816 pool = pool_json['pool_name']
817 has_pools = True
818 pool_type = pool_json['type'] # 1 for rep, 3 for ec
819 min_size = pool_json['min_size']
820 self.log("pool {pool} min_size is {min_size}".format(pool=pool,min_size=min_size))
821 try:
822 ec_profile = self.ceph_manager.get_pool_property(pool, 'erasure_code_profile')
823 if pool_type != PoolType.ERASURE_CODED:
824 continue
825 ec_profile = pool_json['erasure_code_profile']
826 ec_profile_json = self.ceph_manager.raw_cluster_cmd(
827 'osd',
828 'erasure-code-profile',
829 'get',
830 ec_profile,
831 '--format=json')
832 ec_json = json.loads(ec_profile_json)
833 local_k = int(ec_json['k'])
834 local_m = int(ec_json['m'])
835 self.log("pool {pool} local_k={k} local_m={m}".format(pool=pool,
836 k=local_k, m=local_m))
837 if local_k > k:
838 self.log("setting k={local_k} from previous {k}".format(local_k=local_k, k=k))
839 k = local_k
840 if local_m < m:
841 self.log("setting m={local_m} from previous {m}".format(local_m=local_m, m=m))
842 m = local_m
843 except CommandFailedError:
844 self.log("failed to read erasure_code_profile. %s was likely removed", pool)
845 continue
846
847 if has_pools :
848 self.log("using k={k}, m={m}".format(k=k,m=m))
849 else:
850 self.log("No pools yet, waiting")
851 time.sleep(5)
852 continue
853
854 if minout > len(self.out_osds): # kill OSDs and mark out
855 self.log("forced to out an osd")
856 self.kill_osd(mark_out=True)
857 continue
858 elif mindead > len(self.dead_osds): # kill OSDs but force timeout
859 self.log("forced to kill an osd")
860 self.kill_osd()
861 continue
862 else: # make mostly-random choice to kill or revive OSDs
863 minup = max(minlive, k)
864 rand_val = random.uniform(0, 1)
865 self.log("choosing based on number of live OSDs and rand val {rand}".\
866 format(rand=rand_val))
867 if len(self.live_osds) > minup+1 and rand_val < 0.5:
868 # chose to knock out as many OSDs as we can w/out downing PGs
869
870 most_killable = min(len(self.live_osds) - minup, m)
871 self.log("chose to kill {n} OSDs".format(n=most_killable))
872 for i in range(1, most_killable):
873 self.kill_osd(mark_out=True)
874 time.sleep(10)
875 # try a few times since there might be a concurrent pool
876 # creation or deletion
877 with safe_while(
878 sleep=5, tries=5,
879 action='check for active or peered') as proceed:
880 while proceed():
881 if self.ceph_manager.all_active_or_peered():
882 break
883 self.log('not all PGs are active or peered')
884 else: # chose to revive OSDs, bring up a random fraction of the dead ones
885 self.log("chose to revive osds")
886 for i in range(1, int(rand_val * len(self.dead_osds))):
887 self.revive_osd(i)
888
889 # let PGs repair themselves or our next knockout might kill one
890 self.ceph_manager.wait_for_clean(timeout=self.config.get('timeout'))
891
892 # / while not self.stopping
893 self.all_up_in()
894
7c673cae
FG
895 self.ceph_manager.wait_for_recovery(
896 timeout=self.config.get('timeout')
897 )
898
899 def inject_pause(self, conf_key, duration, check_after, should_be_down):
900 """
901 Pause injection testing. Check for osd being down when finished.
902 """
903 the_one = random.choice(self.live_osds)
904 self.log("inject_pause on {osd}".format(osd=the_one))
905 self.log(
906 "Testing {key} pause injection for duration {duration}".format(
907 key=conf_key,
908 duration=duration
909 ))
910 self.log(
911 "Checking after {after}, should_be_down={shouldbedown}".format(
912 after=check_after,
913 shouldbedown=should_be_down
914 ))
915 self.ceph_manager.set_config(the_one, **{conf_key: duration})
916 if not should_be_down:
917 return
918 time.sleep(check_after)
919 status = self.ceph_manager.get_osd_status()
920 assert the_one in status['down']
921 time.sleep(duration - check_after + 20)
922 status = self.ceph_manager.get_osd_status()
923 assert not the_one in status['down']
924
925 def test_backfill_full(self):
926 """
927 Test backfills stopping when the replica fills up.
928
929 First, use injectfull admin command to simulate a now full
930 osd by setting it to 0 on all of the OSDs.
931
932 Second, on a random subset, set
933 osd_debug_skip_full_check_in_backfill_reservation to force
934 the more complicated check in do_scan to be exercised.
935
3efd9988 936 Then, verify that all backfillings stop.
7c673cae
FG
937 """
938 self.log("injecting backfill full")
939 for i in self.live_osds:
940 self.ceph_manager.set_config(
941 i,
942 osd_debug_skip_full_check_in_backfill_reservation=
943 random.choice(['false', 'true']))
944 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'backfillfull'],
945 check_status=True, timeout=30, stdout=DEVNULL)
946 for i in range(30):
947 status = self.ceph_manager.compile_pg_status()
3efd9988 948 if 'backfilling' not in status.keys():
7c673cae
FG
949 break
950 self.log(
3efd9988
FG
951 "waiting for {still_going} backfillings".format(
952 still_going=status.get('backfilling')))
7c673cae 953 time.sleep(1)
3efd9988 954 assert('backfilling' not in self.ceph_manager.compile_pg_status().keys())
7c673cae
FG
955 for i in self.live_osds:
956 self.ceph_manager.set_config(
957 i,
958 osd_debug_skip_full_check_in_backfill_reservation='false')
959 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'none'],
960 check_status=True, timeout=30, stdout=DEVNULL)
961
962 def test_map_discontinuity(self):
963 """
964 1) Allows the osds to recover
965 2) kills an osd
966 3) allows the remaining osds to recover
967 4) waits for some time
968 5) revives the osd
969 This sequence should cause the revived osd to have to handle
970 a map gap since the mons would have trimmed
971 """
972 while len(self.in_osds) < (self.minin + 1):
973 self.in_osd()
974 self.log("Waiting for recovery")
c07f9fc5 975 self.ceph_manager.wait_for_all_osds_up(
7c673cae
FG
976 timeout=self.config.get('timeout')
977 )
978 # now we wait 20s for the pg status to change, if it takes longer,
979 # the test *should* fail!
980 time.sleep(20)
981 self.ceph_manager.wait_for_clean(
982 timeout=self.config.get('timeout')
983 )
984
985 # now we wait 20s for the backfill replicas to hear about the clean
986 time.sleep(20)
987 self.log("Recovered, killing an osd")
988 self.kill_osd(mark_down=True, mark_out=True)
989 self.log("Waiting for clean again")
990 self.ceph_manager.wait_for_clean(
991 timeout=self.config.get('timeout')
992 )
993 self.log("Waiting for trim")
994 time.sleep(int(self.config.get("map_discontinuity_sleep_time", 40)))
995 self.revive_osd()
996
997 def choose_action(self):
998 """
999 Random action selector.
1000 """
1001 chance_down = self.config.get('chance_down', 0.4)
9f95a23c 1002 _ = self.config.get('chance_test_min_size', 0)
7c673cae
FG
1003 chance_test_backfill_full = \
1004 self.config.get('chance_test_backfill_full', 0)
1005 if isinstance(chance_down, int):
1006 chance_down = float(chance_down) / 100
1007 minin = self.minin
9f95a23c
TL
1008 minout = int(self.config.get("min_out", 0))
1009 minlive = int(self.config.get("min_live", 2))
1010 mindead = int(self.config.get("min_dead", 0))
7c673cae
FG
1011
1012 self.log('choose_action: min_in %d min_out '
1013 '%d min_live %d min_dead %d' %
1014 (minin, minout, minlive, mindead))
1015 actions = []
1016 if len(self.in_osds) > minin:
1017 actions.append((self.out_osd, 1.0,))
1018 if len(self.live_osds) > minlive and chance_down > 0:
1019 actions.append((self.kill_osd, chance_down,))
7c673cae
FG
1020 if len(self.out_osds) > minout:
1021 actions.append((self.in_osd, 1.7,))
1022 if len(self.dead_osds) > mindead:
1023 actions.append((self.revive_osd, 1.0,))
1024 if self.config.get('thrash_primary_affinity', True):
1025 actions.append((self.primary_affinity, 1.0,))
1026 actions.append((self.reweight_osd_or_by_util,
1027 self.config.get('reweight_osd', .5),))
1028 actions.append((self.grow_pool,
1029 self.config.get('chance_pgnum_grow', 0),))
11fdf7f2
TL
1030 actions.append((self.shrink_pool,
1031 self.config.get('chance_pgnum_shrink', 0),))
7c673cae
FG
1032 actions.append((self.fix_pgp_num,
1033 self.config.get('chance_pgpnum_fix', 0),))
1034 actions.append((self.test_pool_min_size,
9f95a23c 1035 self.config.get('chance_test_min_size', 0),))
7c673cae
FG
1036 actions.append((self.test_backfill_full,
1037 chance_test_backfill_full,))
1038 if self.chance_thrash_cluster_full > 0:
1039 actions.append((self.thrash_cluster_full, self.chance_thrash_cluster_full,))
1040 if self.chance_thrash_pg_upmap > 0:
1041 actions.append((self.thrash_pg_upmap, self.chance_thrash_pg_upmap,))
1042 if self.chance_thrash_pg_upmap_items > 0:
1043 actions.append((self.thrash_pg_upmap_items, self.chance_thrash_pg_upmap_items,))
c07f9fc5
FG
1044 if self.chance_force_recovery > 0:
1045 actions.append((self.force_cancel_recovery, self.chance_force_recovery))
7c673cae
FG
1046
1047 for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
1048 for scenario in [
1049 (lambda:
1050 self.inject_pause(key,
1051 self.config.get('pause_short', 3),
1052 0,
1053 False),
1054 self.config.get('chance_inject_pause_short', 1),),
1055 (lambda:
1056 self.inject_pause(key,
1057 self.config.get('pause_long', 80),
1058 self.config.get('pause_check_after', 70),
1059 True),
1060 self.config.get('chance_inject_pause_long', 0),)]:
1061 actions.append(scenario)
1062
1063 total = sum([y for (x, y) in actions])
1064 val = random.uniform(0, total)
1065 for (action, prob) in actions:
1066 if val < prob:
1067 return action
1068 val -= prob
1069 return None
1070
9f95a23c
TL
1071 def do_thrash(self):
1072 """
1073 _do_thrash() wrapper.
1074 """
1075 try:
1076 self._do_thrash()
1077 except Exception as e:
1078 # See _run exception comment for MDSThrasher
1079 self.set_thrasher_exception(e)
1080 self.logger.exception("exception:")
1081 # Allow successful completion so gevent doesn't see an exception.
1082 # The DaemonWatchdog will observe the error and tear down the test.
7c673cae
FG
1083
1084 @log_exc
1085 def do_sighup(self):
1086 """
1087 Loops and sends signal.SIGHUP to a random live osd.
1088
1089 Loop delay is controlled by the config value sighup_delay.
1090 """
1091 delay = float(self.sighup_delay)
1092 self.log("starting do_sighup with a delay of {0}".format(delay))
1093 while not self.stopping:
1094 osd = random.choice(self.live_osds)
1095 self.ceph_manager.signal_osd(osd, signal.SIGHUP, silent=True)
1096 time.sleep(delay)
1097
1098 @log_exc
1099 def do_optrack_toggle(self):
1100 """
1101 Loops and toggle op tracking to all osds.
1102
1103 Loop delay is controlled by the config value optrack_toggle_delay.
1104 """
1105 delay = float(self.optrack_toggle_delay)
1106 osd_state = "true"
1107 self.log("starting do_optrack_toggle with a delay of {0}".format(delay))
1108 while not self.stopping:
1109 if osd_state == "true":
1110 osd_state = "false"
1111 else:
1112 osd_state = "true"
11fdf7f2
TL
1113 try:
1114 self.ceph_manager.inject_args('osd', '*',
1115 'osd_enable_op_tracker',
1116 osd_state)
1117 except CommandFailedError:
1118 self.log('Failed to tell all osds, ignoring')
7c673cae
FG
1119 gevent.sleep(delay)
1120
1121 @log_exc
1122 def do_dump_ops(self):
1123 """
1124 Loops and does op dumps on all osds
1125 """
1126 self.log("starting do_dump_ops")
1127 while not self.stopping:
1128 for osd in self.live_osds:
1129 # Ignore errors because live_osds is in flux
1130 self.ceph_manager.osd_admin_socket(osd, command=['dump_ops_in_flight'],
1131 check_status=False, timeout=30, stdout=DEVNULL)
1132 self.ceph_manager.osd_admin_socket(osd, command=['dump_blocked_ops'],
1133 check_status=False, timeout=30, stdout=DEVNULL)
1134 self.ceph_manager.osd_admin_socket(osd, command=['dump_historic_ops'],
1135 check_status=False, timeout=30, stdout=DEVNULL)
1136 gevent.sleep(0)
1137
1138 @log_exc
1139 def do_noscrub_toggle(self):
1140 """
1141 Loops and toggle noscrub flags
1142
1143 Loop delay is controlled by the config value noscrub_toggle_delay.
1144 """
1145 delay = float(self.noscrub_toggle_delay)
1146 scrub_state = "none"
1147 self.log("starting do_noscrub_toggle with a delay of {0}".format(delay))
1148 while not self.stopping:
1149 if scrub_state == "none":
1150 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'noscrub')
1151 scrub_state = "noscrub"
1152 elif scrub_state == "noscrub":
1153 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
1154 scrub_state = "both"
1155 elif scrub_state == "both":
1156 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
1157 scrub_state = "nodeep-scrub"
1158 else:
1159 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1160 scrub_state = "none"
1161 gevent.sleep(delay)
1162 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
1163 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1164
1165 @log_exc
9f95a23c 1166 def _do_thrash(self):
7c673cae
FG
1167 """
1168 Loop to select random actions to thrash ceph manager with.
1169 """
1170 cleanint = self.config.get("clean_interval", 60)
1171 scrubint = self.config.get("scrub_interval", -1)
1172 maxdead = self.config.get("max_dead", 0)
1173 delay = self.config.get("op_delay", 5)
224ce89b
WB
1174 self.rerrosd = self.live_osds[0]
1175 if self.random_eio > 0:
11fdf7f2
TL
1176 self.ceph_manager.inject_args('osd', self.rerrosd,
1177 'filestore_debug_random_read_err',
1178 self.random_eio)
1179 self.ceph_manager.inject_args('osd', self.rerrosd,
1180 'bluestore_debug_random_read_err',
1181 self.random_eio)
7c673cae
FG
1182 self.log("starting do_thrash")
1183 while not self.stopping:
1184 to_log = [str(x) for x in ["in_osds: ", self.in_osds,
1185 "out_osds: ", self.out_osds,
1186 "dead_osds: ", self.dead_osds,
1187 "live_osds: ", self.live_osds]]
1188 self.log(" ".join(to_log))
1189 if random.uniform(0, 1) < (float(delay) / cleanint):
1190 while len(self.dead_osds) > maxdead:
1191 self.revive_osd()
1192 for osd in self.in_osds:
1193 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
1194 str(osd), str(1))
1195 if random.uniform(0, 1) < float(
11fdf7f2
TL
1196 self.config.get('chance_test_map_discontinuity', 0)) \
1197 and len(self.live_osds) > 5: # avoid m=2,k=2 stall, w/ some buffer for crush being picky
7c673cae
FG
1198 self.test_map_discontinuity()
1199 else:
1200 self.ceph_manager.wait_for_recovery(
1201 timeout=self.config.get('timeout')
1202 )
1203 time.sleep(self.clean_wait)
1204 if scrubint > 0:
1205 if random.uniform(0, 1) < (float(delay) / scrubint):
1206 self.log('Scrubbing while thrashing being performed')
1207 Scrubber(self.ceph_manager, self.config)
1208 self.choose_action()()
1209 time.sleep(delay)
181888fb 1210 self.all_up()
224ce89b 1211 if self.random_eio > 0:
11fdf7f2
TL
1212 self.ceph_manager.inject_args('osd', self.rerrosd,
1213 'filestore_debug_random_read_err', '0.0')
1214 self.ceph_manager.inject_args('osd', self.rerrosd,
1215 'bluestore_debug_random_read_err', '0.0')
7c673cae
FG
1216 for pool in list(self.pools_to_fix_pgp_num):
1217 if self.ceph_manager.get_pool_pg_num(pool) > 0:
1218 self.fix_pgp_num(pool)
1219 self.pools_to_fix_pgp_num.clear()
1220 for service, opt, saved_value in self.saved_options:
11fdf7f2 1221 self.ceph_manager.inject_args(service, '*', opt, saved_value)
7c673cae 1222 self.saved_options = []
31f18b77 1223 self.all_up_in()
7c673cae
FG
1224
1225
1226class ObjectStoreTool:
1227
1228 def __init__(self, manager, pool, **kwargs):
1229 self.manager = manager
1230 self.pool = pool
1231 self.osd = kwargs.get('osd', None)
1232 self.object_name = kwargs.get('object_name', None)
1233 self.do_revive = kwargs.get('do_revive', True)
1234 if self.osd and self.pool and self.object_name:
1235 if self.osd == "primary":
1236 self.osd = self.manager.get_object_primary(self.pool,
1237 self.object_name)
1238 assert self.osd
1239 if self.object_name:
1240 self.pgid = self.manager.get_object_pg_with_shard(self.pool,
1241 self.object_name,
1242 self.osd)
e306af50
TL
1243 self.remote = next(iter(self.manager.ctx.\
1244 cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()))
7c673cae
FG
1245 path = self.manager.get_filepath().format(id=self.osd)
1246 self.paths = ("--data-path {path} --journal-path {path}/journal".
1247 format(path=path))
1248
1249 def build_cmd(self, options, args, stdin):
1250 lines = []
1251 if self.object_name:
1252 lines.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1253 "{paths} --pgid {pgid} --op list |"
1254 "grep '\"oid\":\"{name}\"')".
1255 format(paths=self.paths,
1256 pgid=self.pgid,
1257 name=self.object_name))
1258 args = '"$object" ' + args
1259 options += " --pgid {pgid}".format(pgid=self.pgid)
1260 cmd = ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1261 format(paths=self.paths,
1262 args=args,
1263 options=options))
1264 if stdin:
1265 cmd = ("echo {payload} | base64 --decode | {cmd}".
1266 format(payload=base64.encode(stdin),
1267 cmd=cmd))
1268 lines.append(cmd)
1269 return "\n".join(lines)
1270
1271 def run(self, options, args, stdin=None, stdout=None):
1272 if stdout is None:
9f95a23c 1273 stdout = BytesIO()
7c673cae
FG
1274 self.manager.kill_osd(self.osd)
1275 cmd = self.build_cmd(options, args, stdin)
1276 self.manager.log(cmd)
1277 try:
1278 proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
1279 check_status=False,
1280 stdout=stdout,
9f95a23c 1281 stderr=BytesIO())
7c673cae
FG
1282 proc.wait()
1283 if proc.exitstatus != 0:
1284 self.manager.log("failed with " + str(proc.exitstatus))
9f95a23c
TL
1285 error = six.ensure_str(proc.stdout.getvalue()) + " " + \
1286 six.ensure_str(proc.stderr.getvalue())
7c673cae
FG
1287 raise Exception(error)
1288 finally:
1289 if self.do_revive:
1290 self.manager.revive_osd(self.osd)
c07f9fc5 1291 self.manager.wait_till_osd_is_up(self.osd, 300)
7c673cae
FG
1292
1293
9f95a23c
TL
1294# XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1295# the same name.
7c673cae
FG
1296class CephManager:
1297 """
1298 Ceph manager object.
1299 Contains several local functions that form a bulk of this module.
1300
9f95a23c
TL
1301 :param controller: the remote machine where the Ceph commands should be
1302 executed
1303 :param ctx: the cluster context
1304 :param config: path to Ceph config file
1305 :param logger: for logging messages
1306 :param cluster: name of the Ceph cluster
7c673cae
FG
1307 """
1308
7c673cae 1309 def __init__(self, controller, ctx=None, config=None, logger=None,
9f95a23c 1310 cluster='ceph', cephadm=False):
7c673cae
FG
1311 self.lock = threading.RLock()
1312 self.ctx = ctx
1313 self.config = config
1314 self.controller = controller
1315 self.next_pool_id = 0
1316 self.cluster = cluster
9f95a23c 1317 self.cephadm = cephadm
7c673cae
FG
1318 if (logger):
1319 self.log = lambda x: logger.info(x)
1320 else:
1321 def tmp(x):
1322 """
1323 implement log behavior.
1324 """
9f95a23c 1325 print(x)
7c673cae
FG
1326 self.log = tmp
1327 if self.config is None:
1328 self.config = dict()
1329 pools = self.list_pools()
1330 self.pools = {}
1331 for pool in pools:
1332 # we may race with a pool deletion; ignore failures here
1333 try:
9f95a23c 1334 self.pools[pool] = self.get_pool_int_property(pool, 'pg_num')
7c673cae
FG
1335 except CommandFailedError:
1336 self.log('Failed to get pg_num from pool %s, ignoring' % pool)
1337
1338 def raw_cluster_cmd(self, *args):
1339 """
1340 Start ceph on a raw cluster. Return count
1341 """
9f95a23c
TL
1342 if self.cephadm:
1343 proc = shell(self.ctx, self.cluster, self.controller,
1344 args=['ceph'] + list(args),
1345 stdout=BytesIO())
1346 else:
1347 testdir = teuthology.get_testdir(self.ctx)
1348 ceph_args = [
1349 'sudo',
1350 'adjust-ulimits',
1351 'ceph-coverage',
1352 '{tdir}/archive/coverage'.format(tdir=testdir),
1353 'timeout',
1354 '120',
1355 'ceph',
1356 '--cluster',
1357 self.cluster,
1358 '--log-early',
1359 ]
1360 ceph_args.extend(args)
1361 proc = self.controller.run(
1362 args=ceph_args,
1363 stdout=BytesIO(),
7c673cae 1364 )
9f95a23c 1365 return six.ensure_str(proc.stdout.getvalue())
7c673cae 1366
11fdf7f2 1367 def raw_cluster_cmd_result(self, *args, **kwargs):
7c673cae
FG
1368 """
1369 Start ceph on a cluster. Return success or failure information.
1370 """
9f95a23c
TL
1371 if self.cephadm:
1372 proc = shell(self.ctx, self.cluster, self.controller,
1373 args=['ceph'] + list(args),
1374 check_status=False)
1375 else:
1376 testdir = teuthology.get_testdir(self.ctx)
1377 ceph_args = [
1378 'sudo',
1379 'adjust-ulimits',
1380 'ceph-coverage',
1381 '{tdir}/archive/coverage'.format(tdir=testdir),
1382 'timeout',
1383 '120',
1384 'ceph',
1385 '--cluster',
1386 self.cluster,
1387 ]
1388 ceph_args.extend(args)
1389 kwargs['args'] = ceph_args
1390 kwargs['check_status'] = False
1391 proc = self.controller.run(**kwargs)
7c673cae
FG
1392 return proc.exitstatus
1393
11fdf7f2 1394 def run_ceph_w(self, watch_channel=None):
7c673cae 1395 """
9f95a23c 1396 Execute "ceph -w" in the background with stdout connected to a BytesIO,
7c673cae 1397 and return the RemoteProcess.
11fdf7f2
TL
1398
1399 :param watch_channel: Specifies the channel to be watched. This can be
1400 'cluster', 'audit', ...
1401 :type watch_channel: str
1402 """
1403 args = ["sudo",
1404 "daemon-helper",
1405 "kill",
1406 "ceph",
1407 '--cluster',
1408 self.cluster,
1409 "-w"]
1410 if watch_channel is not None:
1411 args.append("--watch-channel")
1412 args.append(watch_channel)
e306af50 1413 return self.controller.run(args=args, wait=False, stdout=StringIO(), stdin=run.PIPE)
9f95a23c
TL
1414
1415 def get_mon_socks(self):
1416 """
1417 Get monitor sockets.
1418
1419 :return socks: tuple of strings; strings are individual sockets.
1420 """
1421 from json import loads
1422
1423 output = loads(self.raw_cluster_cmd(['--format=json', 'mon', 'dump']))
1424 socks = []
1425 for mon in output['mons']:
1426 for addrvec_mem in mon['public_addrs']['addrvec']:
1427 socks.append(addrvec_mem['addr'])
1428 return tuple(socks)
1429
1430 def get_msgrv1_mon_socks(self):
1431 """
1432 Get monitor sockets that use msgrv1 to operate.
1433
1434 :return socks: tuple of strings; strings are individual sockets.
1435 """
1436 from json import loads
1437
1438 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1439 socks = []
1440 for mon in output['mons']:
1441 for addrvec_mem in mon['public_addrs']['addrvec']:
1442 if addrvec_mem['type'] == 'v1':
1443 socks.append(addrvec_mem['addr'])
1444 return tuple(socks)
1445
1446 def get_msgrv2_mon_socks(self):
1447 """
1448 Get monitor sockets that use msgrv2 to operate.
1449
1450 :return socks: tuple of strings; strings are individual sockets.
1451 """
1452 from json import loads
1453
1454 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1455 socks = []
1456 for mon in output['mons']:
1457 for addrvec_mem in mon['public_addrs']['addrvec']:
1458 if addrvec_mem['type'] == 'v2':
1459 socks.append(addrvec_mem['addr'])
1460 return tuple(socks)
7c673cae 1461
224ce89b 1462 def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
31f18b77
FG
1463 """
1464 Flush pg stats from a list of OSD ids, ensuring they are reflected
1465 all the way to the monitor. Luminous and later only.
1466
1467 :param osds: list of OSDs to flush
1468 :param no_wait: list of OSDs not to wait for seq id. by default, we
1469 wait for all specified osds, but some of them could be
1470 moved out of osdmap, so we cannot get their updated
1471 stat seq from monitor anymore. in that case, you need
1472 to pass a blacklist.
1473 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
224ce89b 1474 it. (5 min by default)
31f18b77 1475 """
11fdf7f2 1476 seq = {osd: int(self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats'))
31f18b77
FG
1477 for osd in osds}
1478 if not wait_for_mon:
1479 return
1480 if no_wait is None:
1481 no_wait = []
9f95a23c 1482 for osd, need in seq.items():
31f18b77
FG
1483 if osd in no_wait:
1484 continue
1485 got = 0
1486 while wait_for_mon > 0:
11fdf7f2 1487 got = int(self.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd))
31f18b77
FG
1488 self.log('need seq {need} got {got} for osd.{osd}'.format(
1489 need=need, got=got, osd=osd))
1490 if got >= need:
1491 break
1492 A_WHILE = 1
1493 time.sleep(A_WHILE)
1494 wait_for_mon -= A_WHILE
1495 else:
1496 raise Exception('timed out waiting for mon to be updated with '
1497 'osd.{osd}: {got} < {need}'.
1498 format(osd=osd, got=got, need=need))
1499
1500 def flush_all_pg_stats(self):
1501 self.flush_pg_stats(range(len(self.get_osd_dump())))
1502
7c673cae
FG
1503 def do_rados(self, remote, cmd, check_status=True):
1504 """
1505 Execute a remote rados command.
1506 """
1507 testdir = teuthology.get_testdir(self.ctx)
1508 pre = [
1509 'adjust-ulimits',
1510 'ceph-coverage',
1511 '{tdir}/archive/coverage'.format(tdir=testdir),
1512 'rados',
1513 '--cluster',
1514 self.cluster,
1515 ]
1516 pre.extend(cmd)
1517 proc = remote.run(
1518 args=pre,
1519 wait=True,
1520 check_status=check_status
1521 )
1522 return proc
1523
1524 def rados_write_objects(self, pool, num_objects, size,
1525 timelimit, threads, cleanup=False):
1526 """
1527 Write rados objects
1528 Threads not used yet.
1529 """
1530 args = [
1531 '-p', pool,
1532 '--num-objects', num_objects,
1533 '-b', size,
1534 'bench', timelimit,
1535 'write'
1536 ]
1537 if not cleanup:
1538 args.append('--no-cleanup')
1539 return self.do_rados(self.controller, map(str, args))
1540
1541 def do_put(self, pool, obj, fname, namespace=None):
1542 """
1543 Implement rados put operation
1544 """
1545 args = ['-p', pool]
1546 if namespace is not None:
1547 args += ['-N', namespace]
1548 args += [
1549 'put',
1550 obj,
1551 fname
1552 ]
1553 return self.do_rados(
1554 self.controller,
1555 args,
1556 check_status=False
1557 ).exitstatus
1558
1559 def do_get(self, pool, obj, fname='/dev/null', namespace=None):
1560 """
1561 Implement rados get operation
1562 """
1563 args = ['-p', pool]
1564 if namespace is not None:
1565 args += ['-N', namespace]
1566 args += [
1567 'get',
1568 obj,
1569 fname
1570 ]
1571 return self.do_rados(
1572 self.controller,
1573 args,
1574 check_status=False
1575 ).exitstatus
1576
1577 def do_rm(self, pool, obj, namespace=None):
1578 """
1579 Implement rados rm operation
1580 """
1581 args = ['-p', pool]
1582 if namespace is not None:
1583 args += ['-N', namespace]
1584 args += [
1585 'rm',
1586 obj
1587 ]
1588 return self.do_rados(
1589 self.controller,
1590 args,
1591 check_status=False
1592 ).exitstatus
1593
1594 def osd_admin_socket(self, osd_id, command, check_status=True, timeout=0, stdout=None):
1595 if stdout is None:
e306af50 1596 stdout = StringIO()
7c673cae
FG
1597 return self.admin_socket('osd', osd_id, command, check_status, timeout, stdout)
1598
1599 def find_remote(self, service_type, service_id):
1600 """
1601 Get the Remote for the host where a particular service runs.
1602
1603 :param service_type: 'mds', 'osd', 'client'
1604 :param service_id: The second part of a role, e.g. '0' for
1605 the role 'client.0'
1606 :return: a Remote instance for the host where the
1607 requested role is placed
1608 """
1609 return get_remote(self.ctx, self.cluster,
1610 service_type, service_id)
1611
1612 def admin_socket(self, service_type, service_id,
1613 command, check_status=True, timeout=0, stdout=None):
1614 """
1615 Remotely start up ceph specifying the admin socket
1616 :param command: a list of words to use as the command
1617 to the admin socket
1618 """
1619 if stdout is None:
e306af50 1620 stdout = StringIO()
9f95a23c 1621
7c673cae 1622 remote = self.find_remote(service_type, service_id)
9f95a23c
TL
1623
1624 if self.cephadm:
1625 return shell(
1626 self.ctx, self.cluster, remote,
1627 args=[
1628 'ceph', 'daemon', '%s.%s' % (service_type, service_id),
1629 ] + command,
1630 stdout=stdout,
1631 wait=True,
1632 check_status=check_status,
1633 )
1634
1635 testdir = teuthology.get_testdir(self.ctx)
7c673cae
FG
1636 args = [
1637 'sudo',
1638 'adjust-ulimits',
1639 'ceph-coverage',
1640 '{tdir}/archive/coverage'.format(tdir=testdir),
1641 'timeout',
1642 str(timeout),
1643 'ceph',
1644 '--cluster',
1645 self.cluster,
1646 '--admin-daemon',
1647 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1648 cluster=self.cluster,
1649 type=service_type,
1650 id=service_id),
1651 ]
1652 args.extend(command)
1653 return remote.run(
1654 args=args,
1655 stdout=stdout,
1656 wait=True,
1657 check_status=check_status
1658 )
1659
1660 def objectstore_tool(self, pool, options, args, **kwargs):
1661 return ObjectStoreTool(self, pool, **kwargs).run(options, args)
1662
1663 def get_pgid(self, pool, pgnum):
1664 """
1665 :param pool: pool name
1666 :param pgnum: pg number
1667 :returns: a string representing this pg.
1668 """
1669 poolnum = self.get_pool_num(pool)
1670 pg_str = "{poolnum}.{pgnum}".format(
1671 poolnum=poolnum,
1672 pgnum=pgnum)
1673 return pg_str
1674
1675 def get_pg_replica(self, pool, pgnum):
1676 """
1677 get replica for pool, pgnum (e.g. (data, 0)->0
1678 """
1679 pg_str = self.get_pgid(pool, pgnum)
1680 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1681 j = json.loads('\n'.join(output.split('\n')[1:]))
1682 return int(j['acting'][-1])
1683 assert False
1684
1685 def wait_for_pg_stats(func):
11fdf7f2 1686 # both osd_mon_report_interval and mgr_stats_period are 5 seconds
7c673cae
FG
1687 # by default, and take the faulty injection in ms into consideration,
1688 # 12 seconds are more than enough
28e407b8 1689 delays = [1, 1, 2, 3, 5, 8, 13, 0]
7c673cae
FG
1690 @wraps(func)
1691 def wrapper(self, *args, **kwargs):
1692 exc = None
1693 for delay in delays:
1694 try:
1695 return func(self, *args, **kwargs)
1696 except AssertionError as e:
1697 time.sleep(delay)
1698 exc = e
1699 raise exc
1700 return wrapper
1701
1702 def get_pg_primary(self, pool, pgnum):
1703 """
1704 get primary for pool, pgnum (e.g. (data, 0)->0
1705 """
1706 pg_str = self.get_pgid(pool, pgnum)
1707 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1708 j = json.loads('\n'.join(output.split('\n')[1:]))
1709 return int(j['acting'][0])
1710 assert False
1711
1712 def get_pool_num(self, pool):
1713 """
1714 get number for pool (e.g., data -> 2)
1715 """
1716 return int(self.get_pool_dump(pool)['pool'])
1717
1718 def list_pools(self):
1719 """
1720 list all pool names
1721 """
1722 osd_dump = self.get_osd_dump_json()
1723 self.log(osd_dump['pools'])
1724 return [str(i['pool_name']) for i in osd_dump['pools']]
1725
1726 def clear_pools(self):
1727 """
1728 remove all pools
1729 """
1730 [self.remove_pool(i) for i in self.list_pools()]
1731
1732 def kick_recovery_wq(self, osdnum):
1733 """
1734 Run kick_recovery_wq on cluster.
1735 """
1736 return self.raw_cluster_cmd(
1737 'tell', "osd.%d" % (int(osdnum),),
1738 'debug',
1739 'kick_recovery_wq',
1740 '0')
1741
1742 def wait_run_admin_socket(self, service_type,
1743 service_id, args=['version'], timeout=75, stdout=None):
1744 """
11fdf7f2 1745 If osd_admin_socket call succeeds, return. Otherwise wait
7c673cae
FG
1746 five seconds and try again.
1747 """
1748 if stdout is None:
e306af50 1749 stdout = StringIO()
7c673cae
FG
1750 tries = 0
1751 while True:
1752 proc = self.admin_socket(service_type, service_id,
1753 args, check_status=False, stdout=stdout)
9f95a23c 1754 if proc.exitstatus == 0:
7c673cae
FG
1755 return proc
1756 else:
1757 tries += 1
1758 if (tries * 5) > timeout:
1759 raise Exception('timed out waiting for admin_socket '
1760 'to appear after {type}.{id} restart'.
1761 format(type=service_type,
1762 id=service_id))
1763 self.log("waiting on admin_socket for {type}-{id}, "
1764 "{command}".format(type=service_type,
1765 id=service_id,
1766 command=args))
1767 time.sleep(5)
1768
1769 def get_pool_dump(self, pool):
1770 """
1771 get the osd dump part of a pool
1772 """
1773 osd_dump = self.get_osd_dump_json()
1774 for i in osd_dump['pools']:
1775 if i['pool_name'] == pool:
1776 return i
1777 assert False
1778
1779 def get_config(self, service_type, service_id, name):
1780 """
1781 :param node: like 'mon.a'
1782 :param name: the option name
1783 """
1784 proc = self.wait_run_admin_socket(service_type, service_id,
1785 ['config', 'show'])
1786 j = json.loads(proc.stdout.getvalue())
1787 return j[name]
1788
11fdf7f2
TL
1789 def inject_args(self, service_type, service_id, name, value):
1790 whom = '{0}.{1}'.format(service_type, service_id)
1791 if isinstance(value, bool):
1792 value = 'true' if value else 'false'
1793 opt_arg = '--{name}={value}'.format(name=name, value=value)
1794 self.raw_cluster_cmd('--', 'tell', whom, 'injectargs', opt_arg)
1795
7c673cae
FG
1796 def set_config(self, osdnum, **argdict):
1797 """
1798 :param osdnum: osd number
1799 :param argdict: dictionary containing values to set.
1800 """
9f95a23c 1801 for k, v in argdict.items():
7c673cae
FG
1802 self.wait_run_admin_socket(
1803 'osd', osdnum,
1804 ['config', 'set', str(k), str(v)])
1805
1806 def raw_cluster_status(self):
1807 """
1808 Get status from cluster
1809 """
9f95a23c 1810 status = self.raw_cluster_cmd('status', '--format=json')
7c673cae
FG
1811 return json.loads(status)
1812
1813 def raw_osd_status(self):
1814 """
1815 Get osd status from cluster
1816 """
1817 return self.raw_cluster_cmd('osd', 'dump')
1818
1819 def get_osd_status(self):
1820 """
1821 Get osd statuses sorted by states that the osds are in.
1822 """
e306af50 1823 osd_lines = list(filter(
7c673cae 1824 lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
e306af50 1825 self.raw_osd_status().split('\n')))
7c673cae
FG
1826 self.log(osd_lines)
1827 in_osds = [int(i[4:].split()[0])
1828 for i in filter(lambda x: " in " in x, osd_lines)]
1829 out_osds = [int(i[4:].split()[0])
1830 for i in filter(lambda x: " out " in x, osd_lines)]
1831 up_osds = [int(i[4:].split()[0])
1832 for i in filter(lambda x: " up " in x, osd_lines)]
1833 down_osds = [int(i[4:].split()[0])
1834 for i in filter(lambda x: " down " in x, osd_lines)]
1835 dead_osds = [int(x.id_)
1836 for x in filter(lambda x:
1837 not x.running(),
1838 self.ctx.daemons.
1839 iter_daemons_of_role('osd', self.cluster))]
1840 live_osds = [int(x.id_) for x in
1841 filter(lambda x:
1842 x.running(),
1843 self.ctx.daemons.iter_daemons_of_role('osd',
1844 self.cluster))]
1845 return {'in': in_osds, 'out': out_osds, 'up': up_osds,
1846 'down': down_osds, 'dead': dead_osds, 'live': live_osds,
1847 'raw': osd_lines}
1848
1849 def get_num_pgs(self):
1850 """
1851 Check cluster status for the number of pgs
1852 """
1853 status = self.raw_cluster_status()
1854 self.log(status)
1855 return status['pgmap']['num_pgs']
1856
1857 def create_erasure_code_profile(self, profile_name, profile):
1858 """
1859 Create an erasure code profile name that can be used as a parameter
1860 when creating an erasure coded pool.
1861 """
1862 with self.lock:
1863 args = cmd_erasure_code_profile(profile_name, profile)
1864 self.raw_cluster_cmd(*args)
1865
1866 def create_pool_with_unique_name(self, pg_num=16,
1867 erasure_code_profile_name=None,
1868 min_size=None,
1869 erasure_code_use_overwrites=False):
1870 """
1871 Create a pool named unique_pool_X where X is unique.
1872 """
1873 name = ""
1874 with self.lock:
1875 name = "unique_pool_%s" % (str(self.next_pool_id),)
1876 self.next_pool_id += 1
1877 self.create_pool(
1878 name,
1879 pg_num,
1880 erasure_code_profile_name=erasure_code_profile_name,
1881 min_size=min_size,
1882 erasure_code_use_overwrites=erasure_code_use_overwrites)
1883 return name
1884
1885 @contextlib.contextmanager
1886 def pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
1887 self.create_pool(pool_name, pg_num, erasure_code_profile_name)
1888 yield
1889 self.remove_pool(pool_name)
1890
1891 def create_pool(self, pool_name, pg_num=16,
1892 erasure_code_profile_name=None,
1893 min_size=None,
1894 erasure_code_use_overwrites=False):
1895 """
1896 Create a pool named from the pool_name parameter.
1897 :param pool_name: name of the pool being created.
1898 :param pg_num: initial number of pgs.
1899 :param erasure_code_profile_name: if set and !None create an
1900 erasure coded pool using the profile
1901 :param erasure_code_use_overwrites: if true, allow overwrites
1902 """
1903 with self.lock:
9f95a23c 1904 assert isinstance(pool_name, six.string_types)
7c673cae
FG
1905 assert isinstance(pg_num, int)
1906 assert pool_name not in self.pools
1907 self.log("creating pool_name %s" % (pool_name,))
1908 if erasure_code_profile_name:
1909 self.raw_cluster_cmd('osd', 'pool', 'create',
1910 pool_name, str(pg_num), str(pg_num),
1911 'erasure', erasure_code_profile_name)
1912 else:
1913 self.raw_cluster_cmd('osd', 'pool', 'create',
1914 pool_name, str(pg_num))
1915 if min_size is not None:
1916 self.raw_cluster_cmd(
1917 'osd', 'pool', 'set', pool_name,
1918 'min_size',
1919 str(min_size))
1920 if erasure_code_use_overwrites:
1921 self.raw_cluster_cmd(
1922 'osd', 'pool', 'set', pool_name,
1923 'allow_ec_overwrites',
1924 'true')
c07f9fc5
FG
1925 self.raw_cluster_cmd(
1926 'osd', 'pool', 'application', 'enable',
1927 pool_name, 'rados', '--yes-i-really-mean-it',
1928 run.Raw('||'), 'true')
7c673cae
FG
1929 self.pools[pool_name] = pg_num
1930 time.sleep(1)
1931
1932 def add_pool_snap(self, pool_name, snap_name):
1933 """
1934 Add pool snapshot
1935 :param pool_name: name of pool to snapshot
1936 :param snap_name: name of snapshot to take
1937 """
1938 self.raw_cluster_cmd('osd', 'pool', 'mksnap',
1939 str(pool_name), str(snap_name))
1940
1941 def remove_pool_snap(self, pool_name, snap_name):
1942 """
1943 Remove pool snapshot
1944 :param pool_name: name of pool to snapshot
1945 :param snap_name: name of snapshot to remove
1946 """
1947 self.raw_cluster_cmd('osd', 'pool', 'rmsnap',
1948 str(pool_name), str(snap_name))
1949
1950 def remove_pool(self, pool_name):
1951 """
1952 Remove the indicated pool
1953 :param pool_name: Pool to be removed
1954 """
1955 with self.lock:
9f95a23c 1956 assert isinstance(pool_name, six.string_types)
7c673cae
FG
1957 assert pool_name in self.pools
1958 self.log("removing pool_name %s" % (pool_name,))
1959 del self.pools[pool_name]
11fdf7f2
TL
1960 self.raw_cluster_cmd('osd', 'pool', 'rm', pool_name, pool_name,
1961 "--yes-i-really-really-mean-it")
7c673cae
FG
1962
1963 def get_pool(self):
1964 """
1965 Pick a random pool
1966 """
1967 with self.lock:
9f95a23c 1968 if self.pools:
e306af50 1969 return random.sample(self.pools.keys(), 1)[0]
7c673cae
FG
1970
1971 def get_pool_pg_num(self, pool_name):
1972 """
1973 Return the number of pgs in the pool specified.
1974 """
1975 with self.lock:
9f95a23c 1976 assert isinstance(pool_name, six.string_types)
7c673cae
FG
1977 if pool_name in self.pools:
1978 return self.pools[pool_name]
1979 return 0
1980
1981 def get_pool_property(self, pool_name, prop):
1982 """
1983 :param pool_name: pool
1984 :param prop: property to be checked.
9f95a23c 1985 :returns: property as string
7c673cae
FG
1986 """
1987 with self.lock:
9f95a23c
TL
1988 assert isinstance(pool_name, six.string_types)
1989 assert isinstance(prop, six.string_types)
7c673cae
FG
1990 output = self.raw_cluster_cmd(
1991 'osd',
1992 'pool',
1993 'get',
1994 pool_name,
1995 prop)
9f95a23c
TL
1996 return output.split()[1]
1997
1998 def get_pool_int_property(self, pool_name, prop):
1999 return int(self.get_pool_property(pool_name, prop))
7c673cae
FG
2000
2001 def set_pool_property(self, pool_name, prop, val):
2002 """
2003 :param pool_name: pool
2004 :param prop: property to be set.
2005 :param val: value to set.
2006
2007 This routine retries if set operation fails.
2008 """
2009 with self.lock:
9f95a23c
TL
2010 assert isinstance(pool_name, six.string_types)
2011 assert isinstance(prop, six.string_types)
7c673cae
FG
2012 assert isinstance(val, int)
2013 tries = 0
2014 while True:
2015 r = self.raw_cluster_cmd_result(
2016 'osd',
2017 'pool',
2018 'set',
2019 pool_name,
2020 prop,
2021 str(val))
2022 if r != 11: # EAGAIN
2023 break
2024 tries += 1
2025 if tries > 50:
2026 raise Exception('timed out getting EAGAIN '
2027 'when setting pool property %s %s = %s' %
2028 (pool_name, prop, val))
2029 self.log('got EAGAIN setting pool property, '
2030 'waiting a few seconds...')
2031 time.sleep(2)
2032
2033 def expand_pool(self, pool_name, by, max_pgs):
2034 """
2035 Increase the number of pgs in a pool
2036 """
2037 with self.lock:
9f95a23c 2038 assert isinstance(pool_name, six.string_types)
7c673cae
FG
2039 assert isinstance(by, int)
2040 assert pool_name in self.pools
2041 if self.get_num_creating() > 0:
2042 return False
2043 if (self.pools[pool_name] + by) > max_pgs:
2044 return False
2045 self.log("increase pool size by %d" % (by,))
2046 new_pg_num = self.pools[pool_name] + by
2047 self.set_pool_property(pool_name, "pg_num", new_pg_num)
2048 self.pools[pool_name] = new_pg_num
2049 return True
2050
11fdf7f2
TL
2051 def contract_pool(self, pool_name, by, min_pgs):
2052 """
2053 Decrease the number of pgs in a pool
2054 """
2055 with self.lock:
2056 self.log('contract_pool %s by %s min %s' % (
2057 pool_name, str(by), str(min_pgs)))
9f95a23c 2058 assert isinstance(pool_name, six.string_types)
11fdf7f2
TL
2059 assert isinstance(by, int)
2060 assert pool_name in self.pools
2061 if self.get_num_creating() > 0:
2062 self.log('too many creating')
2063 return False
2064 proj = self.pools[pool_name] - by
2065 if proj < min_pgs:
2066 self.log('would drop below min_pgs, proj %d, currently %d' % (proj,self.pools[pool_name],))
2067 return False
2068 self.log("decrease pool size by %d" % (by,))
2069 new_pg_num = self.pools[pool_name] - by
2070 self.set_pool_property(pool_name, "pg_num", new_pg_num)
2071 self.pools[pool_name] = new_pg_num
2072 return True
2073
2074 def stop_pg_num_changes(self):
2075 """
2076 Reset all pg_num_targets back to pg_num, canceling splits and merges
2077 """
2078 self.log('Canceling any pending splits or merges...')
2079 osd_dump = self.get_osd_dump_json()
9f95a23c
TL
2080 try:
2081 for pool in osd_dump['pools']:
2082 if pool['pg_num'] != pool['pg_num_target']:
2083 self.log('Setting pool %s (%d) pg_num %d -> %d' %
2084 (pool['pool_name'], pool['pool'],
2085 pool['pg_num_target'],
2086 pool['pg_num']))
2087 self.raw_cluster_cmd('osd', 'pool', 'set', pool['pool_name'],
2088 'pg_num', str(pool['pg_num']))
2089 except KeyError:
2090 # we don't support pg_num_target before nautilus
2091 pass
11fdf7f2 2092
7c673cae
FG
2093 def set_pool_pgpnum(self, pool_name, force):
2094 """
2095 Set pgpnum property of pool_name pool.
2096 """
2097 with self.lock:
9f95a23c 2098 assert isinstance(pool_name, six.string_types)
7c673cae
FG
2099 assert pool_name in self.pools
2100 if not force and self.get_num_creating() > 0:
2101 return False
2102 self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
2103 return True
2104
11fdf7f2 2105 def list_pg_unfound(self, pgid):
7c673cae 2106 """
11fdf7f2 2107 return list of unfound pgs with the id specified
7c673cae
FG
2108 """
2109 r = None
2110 offset = {}
2111 while True:
11fdf7f2 2112 out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_unfound',
7c673cae
FG
2113 json.dumps(offset))
2114 j = json.loads(out)
2115 if r is None:
2116 r = j
2117 else:
2118 r['objects'].extend(j['objects'])
2119 if not 'more' in j:
2120 break
2121 if j['more'] == 0:
2122 break
2123 offset = j['objects'][-1]['oid']
2124 if 'more' in r:
2125 del r['more']
2126 return r
2127
2128 def get_pg_stats(self):
2129 """
2130 Dump the cluster and get pg stats
2131 """
2132 out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
2133 j = json.loads('\n'.join(out.split('\n')[1:]))
11fdf7f2
TL
2134 try:
2135 return j['pg_map']['pg_stats']
2136 except KeyError:
2137 return j['pg_stats']
7c673cae 2138
c07f9fc5
FG
2139 def get_pgids_to_force(self, backfill):
2140 """
2141 Return the randomized list of PGs that can have their recovery/backfill forced
2142 """
2143 j = self.get_pg_stats();
2144 pgids = []
2145 if backfill:
2146 wanted = ['degraded', 'backfilling', 'backfill_wait']
2147 else:
2148 wanted = ['recovering', 'degraded', 'recovery_wait']
2149 for pg in j:
2150 status = pg['state'].split('+')
2151 for t in wanted:
2152 if random.random() > 0.5 and not ('forced_backfill' in status or 'forced_recovery' in status) and t in status:
2153 pgids.append(pg['pgid'])
2154 break
2155 return pgids
2156
2157 def get_pgids_to_cancel_force(self, backfill):
2158 """
2159 Return the randomized list of PGs whose recovery/backfill priority is forced
2160 """
2161 j = self.get_pg_stats();
2162 pgids = []
2163 if backfill:
2164 wanted = 'forced_backfill'
2165 else:
2166 wanted = 'forced_recovery'
2167 for pg in j:
2168 status = pg['state'].split('+')
2169 if wanted in status and random.random() > 0.5:
2170 pgids.append(pg['pgid'])
2171 return pgids
2172
7c673cae
FG
2173 def compile_pg_status(self):
2174 """
2175 Return a histogram of pg state values
2176 """
2177 ret = {}
2178 j = self.get_pg_stats()
2179 for pg in j:
2180 for status in pg['state'].split('+'):
2181 if status not in ret:
2182 ret[status] = 0
2183 ret[status] += 1
2184 return ret
2185
9f95a23c 2186 @wait_for_pg_stats # type: ignore
7c673cae
FG
2187 def with_pg_state(self, pool, pgnum, check):
2188 pgstr = self.get_pgid(pool, pgnum)
2189 stats = self.get_single_pg_stats(pgstr)
2190 assert(check(stats['state']))
2191
9f95a23c 2192 @wait_for_pg_stats # type: ignore
7c673cae
FG
2193 def with_pg(self, pool, pgnum, check):
2194 pgstr = self.get_pgid(pool, pgnum)
2195 stats = self.get_single_pg_stats(pgstr)
2196 return check(stats)
2197
2198 def get_last_scrub_stamp(self, pool, pgnum):
2199 """
2200 Get the timestamp of the last scrub.
2201 """
2202 stats = self.get_single_pg_stats(self.get_pgid(pool, pgnum))
2203 return stats["last_scrub_stamp"]
2204
2205 def do_pg_scrub(self, pool, pgnum, stype):
2206 """
2207 Scrub pg and wait for scrubbing to finish
2208 """
2209 init = self.get_last_scrub_stamp(pool, pgnum)
2210 RESEND_TIMEOUT = 120 # Must be a multiple of SLEEP_TIME
2211 FATAL_TIMEOUT = RESEND_TIMEOUT * 3
2212 SLEEP_TIME = 10
2213 timer = 0
2214 while init == self.get_last_scrub_stamp(pool, pgnum):
2215 assert timer < FATAL_TIMEOUT, "fatal timeout trying to " + stype
2216 self.log("waiting for scrub type %s" % (stype,))
2217 if (timer % RESEND_TIMEOUT) == 0:
2218 self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
2219 # The first time in this loop is the actual request
2220 if timer != 0 and stype == "repair":
2221 self.log("WARNING: Resubmitted a non-idempotent repair")
2222 time.sleep(SLEEP_TIME)
2223 timer += SLEEP_TIME
2224
2225 def wait_snap_trimming_complete(self, pool):
2226 """
2227 Wait for snap trimming on pool to end
2228 """
2229 POLL_PERIOD = 10
2230 FATAL_TIMEOUT = 600
2231 start = time.time()
2232 poolnum = self.get_pool_num(pool)
2233 poolnumstr = "%s." % (poolnum,)
2234 while (True):
2235 now = time.time()
2236 if (now - start) > FATAL_TIMEOUT:
2237 assert (now - start) < FATAL_TIMEOUT, \
2238 'failed to complete snap trimming before timeout'
2239 all_stats = self.get_pg_stats()
2240 trimming = False
2241 for pg in all_stats:
2242 if (poolnumstr in pg['pgid']) and ('snaptrim' in pg['state']):
2243 self.log("pg {pg} in trimming, state: {state}".format(
2244 pg=pg['pgid'],
2245 state=pg['state']))
2246 trimming = True
2247 if not trimming:
2248 break
2249 self.log("{pool} still trimming, waiting".format(pool=pool))
2250 time.sleep(POLL_PERIOD)
2251
2252 def get_single_pg_stats(self, pgid):
2253 """
2254 Return pg for the pgid specified.
2255 """
2256 all_stats = self.get_pg_stats()
2257
2258 for pg in all_stats:
2259 if pg['pgid'] == pgid:
2260 return pg
2261
2262 return None
2263
2264 def get_object_pg_with_shard(self, pool, name, osdid):
2265 """
2266 """
2267 pool_dump = self.get_pool_dump(pool)
2268 object_map = self.get_object_map(pool, name)
9f95a23c 2269 if pool_dump["type"] == PoolType.ERASURE_CODED:
7c673cae
FG
2270 shard = object_map['acting'].index(osdid)
2271 return "{pgid}s{shard}".format(pgid=object_map['pgid'],
2272 shard=shard)
2273 else:
2274 return object_map['pgid']
2275
2276 def get_object_primary(self, pool, name):
2277 """
2278 """
2279 object_map = self.get_object_map(pool, name)
2280 return object_map['acting_primary']
2281
2282 def get_object_map(self, pool, name):
2283 """
2284 osd map --format=json converted to a python object
2285 :returns: the python object
2286 """
2287 out = self.raw_cluster_cmd('--format=json', 'osd', 'map', pool, name)
2288 return json.loads('\n'.join(out.split('\n')[1:]))
2289
2290 def get_osd_dump_json(self):
2291 """
2292 osd dump --format=json converted to a python object
2293 :returns: the python object
2294 """
2295 out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
2296 return json.loads('\n'.join(out.split('\n')[1:]))
2297
2298 def get_osd_dump(self):
2299 """
2300 Dump osds
2301 :returns: all osds
2302 """
2303 return self.get_osd_dump_json()['osds']
2304
11fdf7f2
TL
2305 def get_osd_metadata(self):
2306 """
2307 osd metadata --format=json converted to a python object
2308 :returns: the python object containing osd metadata information
2309 """
2310 out = self.raw_cluster_cmd('osd', 'metadata', '--format=json')
2311 return json.loads('\n'.join(out.split('\n')[1:]))
2312
c07f9fc5
FG
2313 def get_mgr_dump(self):
2314 out = self.raw_cluster_cmd('mgr', 'dump', '--format=json')
2315 return json.loads(out)
2316
7c673cae
FG
2317 def get_stuck_pgs(self, type_, threshold):
2318 """
2319 :returns: stuck pg information from the cluster
2320 """
2321 out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
2322 '--format=json')
11fdf7f2 2323 return json.loads(out).get('stuck_pg_stats',[])
7c673cae
FG
2324
2325 def get_num_unfound_objects(self):
2326 """
2327 Check cluster status to get the number of unfound objects
2328 """
2329 status = self.raw_cluster_status()
2330 self.log(status)
2331 return status['pgmap'].get('unfound_objects', 0)
2332
2333 def get_num_creating(self):
2334 """
2335 Find the number of pgs in creating mode.
2336 """
2337 pgs = self.get_pg_stats()
2338 num = 0
2339 for pg in pgs:
2340 if 'creating' in pg['state']:
2341 num += 1
2342 return num
2343
2344 def get_num_active_clean(self):
2345 """
2346 Find the number of active and clean pgs.
2347 """
2348 pgs = self.get_pg_stats()
9f95a23c
TL
2349 return self._get_num_active_clean(pgs)
2350
2351 def _get_num_active_clean(self, pgs):
7c673cae
FG
2352 num = 0
2353 for pg in pgs:
2354 if (pg['state'].count('active') and
2355 pg['state'].count('clean') and
2356 not pg['state'].count('stale')):
2357 num += 1
2358 return num
2359
2360 def get_num_active_recovered(self):
2361 """
2362 Find the number of active and recovered pgs.
2363 """
2364 pgs = self.get_pg_stats()
9f95a23c
TL
2365 return self._get_num_active_recovered(pgs)
2366
2367 def _get_num_active_recovered(self, pgs):
7c673cae
FG
2368 num = 0
2369 for pg in pgs:
2370 if (pg['state'].count('active') and
2371 not pg['state'].count('recover') and
3efd9988 2372 not pg['state'].count('backfilling') and
7c673cae
FG
2373 not pg['state'].count('stale')):
2374 num += 1
2375 return num
2376
2377 def get_is_making_recovery_progress(self):
2378 """
2379 Return whether there is recovery progress discernable in the
2380 raw cluster status
2381 """
2382 status = self.raw_cluster_status()
2383 kps = status['pgmap'].get('recovering_keys_per_sec', 0)
2384 bps = status['pgmap'].get('recovering_bytes_per_sec', 0)
2385 ops = status['pgmap'].get('recovering_objects_per_sec', 0)
2386 return kps > 0 or bps > 0 or ops > 0
2387
2388 def get_num_active(self):
2389 """
2390 Find the number of active pgs.
2391 """
2392 pgs = self.get_pg_stats()
9f95a23c
TL
2393 return self._get_num_active(pgs)
2394
2395 def _get_num_active(self, pgs):
7c673cae
FG
2396 num = 0
2397 for pg in pgs:
2398 if pg['state'].count('active') and not pg['state'].count('stale'):
2399 num += 1
2400 return num
2401
2402 def get_num_down(self):
2403 """
2404 Find the number of pgs that are down.
2405 """
2406 pgs = self.get_pg_stats()
2407 num = 0
2408 for pg in pgs:
2409 if ((pg['state'].count('down') and not
2410 pg['state'].count('stale')) or
2411 (pg['state'].count('incomplete') and not
2412 pg['state'].count('stale'))):
2413 num += 1
2414 return num
2415
2416 def get_num_active_down(self):
2417 """
2418 Find the number of pgs that are either active or down.
2419 """
2420 pgs = self.get_pg_stats()
9f95a23c
TL
2421 return self._get_num_active_down(pgs)
2422
2423 def _get_num_active_down(self, pgs):
7c673cae
FG
2424 num = 0
2425 for pg in pgs:
2426 if ((pg['state'].count('active') and not
2427 pg['state'].count('stale')) or
2428 (pg['state'].count('down') and not
2429 pg['state'].count('stale')) or
2430 (pg['state'].count('incomplete') and not
2431 pg['state'].count('stale'))):
2432 num += 1
2433 return num
2434
9f95a23c
TL
2435 def get_num_peered(self):
2436 """
2437 Find the number of PGs that are peered
2438 """
2439 pgs = self.get_pg_stats()
2440 return self._get_num_peered(pgs)
2441
2442 def _get_num_peered(self, pgs):
2443 num = 0
2444 for pg in pgs:
2445 if pg['state'].count('peered') and not pg['state'].count('stale'):
2446 num += 1
2447 return num
2448
7c673cae
FG
2449 def is_clean(self):
2450 """
2451 True if all pgs are clean
2452 """
9f95a23c
TL
2453 pgs = self.get_pg_stats()
2454 return self._get_num_active_clean(pgs) == len(pgs)
7c673cae
FG
2455
2456 def is_recovered(self):
2457 """
2458 True if all pgs have recovered
2459 """
9f95a23c
TL
2460 pgs = self.get_pg_stats()
2461 return self._get_num_active_recovered(pgs) == len(pgs)
7c673cae
FG
2462
2463 def is_active_or_down(self):
2464 """
2465 True if all pgs are active or down
2466 """
9f95a23c
TL
2467 pgs = self.get_pg_stats()
2468 return self._get_num_active_down(pgs) == len(pgs)
7c673cae 2469
f6b5b4d7
TL
2470 def dump_pgs_not_active_clean(self):
2471 """
2472 Dumps all pgs that are not active+clean
2473 """
2474 pgs = self.get_pg_stats()
2475 for pg in pgs:
2476 if pg['state'] != 'active+clean':
2477 self.log('PG %s is not active+clean' % pg['pgid'])
2478 self.log(pg)
2479
2480 def dump_pgs_not_active_down(self):
2481 """
2482 Dumps all pgs that are not active or down
2483 """
2484 pgs = self.get_pg_stats()
2485 for pg in pgs:
2486 if 'active' not in pg['state'] and 'down' not in pg['state']:
2487 self.log('PG %s is not active or down' % pg['pgid'])
2488 self.log(pg)
2489
2490 def dump_pgs_not_active(self):
2491 """
2492 Dumps all pgs that are not active
2493 """
2494 pgs = self.get_pg_stats()
2495 for pg in pgs:
2496 if 'active' not in pg['state']:
2497 self.log('PG %s is not active' % pg['pgid'])
2498 self.log(pg)
2499
11fdf7f2 2500 def wait_for_clean(self, timeout=1200):
7c673cae
FG
2501 """
2502 Returns true when all pgs are clean.
2503 """
2504 self.log("waiting for clean")
2505 start = time.time()
2506 num_active_clean = self.get_num_active_clean()
2507 while not self.is_clean():
2508 if timeout is not None:
2509 if self.get_is_making_recovery_progress():
2510 self.log("making progress, resetting timeout")
2511 start = time.time()
2512 else:
2513 self.log("no progress seen, keeping timeout for now")
2514 if time.time() - start >= timeout:
f6b5b4d7
TL
2515 self.log('dumping pgs not clean')
2516 self.dump_pgs_not_active_clean()
7c673cae 2517 assert time.time() - start < timeout, \
f6b5b4d7 2518 'wait_for_clean: failed before timeout expired'
7c673cae
FG
2519 cur_active_clean = self.get_num_active_clean()
2520 if cur_active_clean != num_active_clean:
2521 start = time.time()
2522 num_active_clean = cur_active_clean
2523 time.sleep(3)
2524 self.log("clean!")
2525
2526 def are_all_osds_up(self):
2527 """
2528 Returns true if all osds are up.
2529 """
2530 x = self.get_osd_dump()
2531 return (len(x) == sum([(y['up'] > 0) for y in x]))
2532
c07f9fc5 2533 def wait_for_all_osds_up(self, timeout=None):
7c673cae
FG
2534 """
2535 When this exits, either the timeout has expired, or all
2536 osds are up.
2537 """
2538 self.log("waiting for all up")
2539 start = time.time()
2540 while not self.are_all_osds_up():
2541 if timeout is not None:
2542 assert time.time() - start < timeout, \
c07f9fc5 2543 'timeout expired in wait_for_all_osds_up'
7c673cae
FG
2544 time.sleep(3)
2545 self.log("all up!")
2546
c07f9fc5
FG
2547 def pool_exists(self, pool):
2548 if pool in self.list_pools():
2549 return True
2550 return False
2551
2552 def wait_for_pool(self, pool, timeout=300):
2553 """
2554 Wait for a pool to exist
2555 """
2556 self.log('waiting for pool %s to exist' % pool)
2557 start = time.time()
2558 while not self.pool_exists(pool):
2559 if timeout is not None:
2560 assert time.time() - start < timeout, \
2561 'timeout expired in wait_for_pool'
2562 time.sleep(3)
2563
2564 def wait_for_pools(self, pools):
2565 for pool in pools:
2566 self.wait_for_pool(pool)
2567
2568 def is_mgr_available(self):
2569 x = self.get_mgr_dump()
2570 return x.get('available', False)
2571
2572 def wait_for_mgr_available(self, timeout=None):
2573 self.log("waiting for mgr available")
2574 start = time.time()
2575 while not self.is_mgr_available():
2576 if timeout is not None:
2577 assert time.time() - start < timeout, \
2578 'timeout expired in wait_for_mgr_available'
2579 time.sleep(3)
2580 self.log("mgr available!")
2581
7c673cae
FG
2582 def wait_for_recovery(self, timeout=None):
2583 """
2584 Check peering. When this exists, we have recovered.
2585 """
2586 self.log("waiting for recovery to complete")
2587 start = time.time()
2588 num_active_recovered = self.get_num_active_recovered()
2589 while not self.is_recovered():
2590 now = time.time()
2591 if timeout is not None:
2592 if self.get_is_making_recovery_progress():
2593 self.log("making progress, resetting timeout")
2594 start = time.time()
2595 else:
2596 self.log("no progress seen, keeping timeout for now")
2597 if now - start >= timeout:
9f95a23c
TL
2598 if self.is_recovered():
2599 break
f6b5b4d7
TL
2600 self.log('dumping pgs not recovered yet')
2601 self.dump_pgs_not_active_clean()
7c673cae 2602 assert now - start < timeout, \
f6b5b4d7 2603 'wait_for_recovery: failed before timeout expired'
7c673cae
FG
2604 cur_active_recovered = self.get_num_active_recovered()
2605 if cur_active_recovered != num_active_recovered:
2606 start = time.time()
2607 num_active_recovered = cur_active_recovered
2608 time.sleep(3)
2609 self.log("recovered!")
2610
2611 def wait_for_active(self, timeout=None):
2612 """
2613 Check peering. When this exists, we are definitely active
2614 """
2615 self.log("waiting for peering to complete")
2616 start = time.time()
2617 num_active = self.get_num_active()
2618 while not self.is_active():
2619 if timeout is not None:
2620 if time.time() - start >= timeout:
f6b5b4d7
TL
2621 self.log('dumping pgs not active')
2622 self.dump_pgs_not_active()
7c673cae 2623 assert time.time() - start < timeout, \
f6b5b4d7 2624 'wait_for_active: failed before timeout expired'
7c673cae
FG
2625 cur_active = self.get_num_active()
2626 if cur_active != num_active:
2627 start = time.time()
2628 num_active = cur_active
2629 time.sleep(3)
2630 self.log("active!")
2631
2632 def wait_for_active_or_down(self, timeout=None):
2633 """
2634 Check peering. When this exists, we are definitely either
2635 active or down
2636 """
2637 self.log("waiting for peering to complete or become blocked")
2638 start = time.time()
2639 num_active_down = self.get_num_active_down()
2640 while not self.is_active_or_down():
2641 if timeout is not None:
2642 if time.time() - start >= timeout:
f6b5b4d7
TL
2643 self.log('dumping pgs not active or down')
2644 self.dump_pgs_not_active_down()
7c673cae 2645 assert time.time() - start < timeout, \
f6b5b4d7 2646 'wait_for_active_or_down: failed before timeout expired'
7c673cae
FG
2647 cur_active_down = self.get_num_active_down()
2648 if cur_active_down != num_active_down:
2649 start = time.time()
2650 num_active_down = cur_active_down
2651 time.sleep(3)
2652 self.log("active or down!")
2653
2654 def osd_is_up(self, osd):
2655 """
2656 Wrapper for osd check
2657 """
2658 osds = self.get_osd_dump()
2659 return osds[osd]['up'] > 0
2660
2661 def wait_till_osd_is_up(self, osd, timeout=None):
2662 """
2663 Loop waiting for osd.
2664 """
2665 self.log('waiting for osd.%d to be up' % osd)
2666 start = time.time()
2667 while not self.osd_is_up(osd):
2668 if timeout is not None:
2669 assert time.time() - start < timeout, \
2670 'osd.%d failed to come up before timeout expired' % osd
2671 time.sleep(3)
2672 self.log('osd.%d is up' % osd)
2673
2674 def is_active(self):
2675 """
2676 Wrapper to check if all pgs are active
2677 """
2678 return self.get_num_active() == self.get_num_pgs()
2679
9f95a23c
TL
2680 def all_active_or_peered(self):
2681 """
2682 Wrapper to check if all PGs are active or peered
2683 """
2684 pgs = self.get_pg_stats()
2685 return self._get_num_active(pgs) + self._get_num_peered(pgs) == len(pgs)
2686
7c673cae
FG
2687 def wait_till_active(self, timeout=None):
2688 """
2689 Wait until all pgs are active.
2690 """
2691 self.log("waiting till active")
2692 start = time.time()
2693 while not self.is_active():
2694 if timeout is not None:
2695 if time.time() - start >= timeout:
f6b5b4d7
TL
2696 self.log('dumping pgs not active')
2697 self.dump_pgs_not_active()
7c673cae 2698 assert time.time() - start < timeout, \
f6b5b4d7 2699 'wait_till_active: failed before timeout expired'
7c673cae
FG
2700 time.sleep(3)
2701 self.log("active!")
2702
3efd9988
FG
2703 def wait_till_pg_convergence(self, timeout=None):
2704 start = time.time()
2705 old_stats = None
2706 active_osds = [osd['osd'] for osd in self.get_osd_dump()
2707 if osd['in'] and osd['up']]
2708 while True:
2709 # strictly speaking, no need to wait for mon. but due to the
2710 # "ms inject socket failures" setting, the osdmap could be delayed,
2711 # so mgr is likely to ignore the pg-stat messages with pgs serving
2712 # newly created pools which is not yet known by mgr. so, to make sure
2713 # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
2714 # necessary.
2715 self.flush_pg_stats(active_osds)
2716 new_stats = dict((stat['pgid'], stat['state'])
2717 for stat in self.get_pg_stats())
2718 if old_stats == new_stats:
2719 return old_stats
2720 if timeout is not None:
2721 assert time.time() - start < timeout, \
2722 'failed to reach convergence before %d secs' % timeout
2723 old_stats = new_stats
2724 # longer than mgr_stats_period
2725 time.sleep(5 + 1)
2726
7c673cae
FG
2727 def mark_out_osd(self, osd):
2728 """
2729 Wrapper to mark osd out.
2730 """
2731 self.raw_cluster_cmd('osd', 'out', str(osd))
2732
2733 def kill_osd(self, osd):
2734 """
2735 Kill osds by either power cycling (if indicated by the config)
2736 or by stopping.
2737 """
2738 if self.config.get('powercycle'):
2739 remote = self.find_remote('osd', osd)
2740 self.log('kill_osd on osd.{o} '
2741 'doing powercycle of {s}'.format(o=osd, s=remote.name))
2742 self._assert_ipmi(remote)
2743 remote.console.power_off()
2744 elif self.config.get('bdev_inject_crash') and self.config.get('bdev_inject_crash_probability'):
2745 if random.uniform(0, 1) < self.config.get('bdev_inject_crash_probability', .5):
11fdf7f2
TL
2746 self.inject_args(
2747 'osd', osd,
2748 'bdev-inject-crash', self.config.get('bdev_inject_crash'))
7c673cae
FG
2749 try:
2750 self.ctx.daemons.get_daemon('osd', osd, self.cluster).wait()
2751 except:
2752 pass
2753 else:
2754 raise RuntimeError('osd.%s did not fail' % osd)
2755 else:
2756 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2757 else:
2758 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2759
2760 @staticmethod
2761 def _assert_ipmi(remote):
2762 assert remote.console.has_ipmi_credentials, (
2763 "powercycling requested but RemoteConsole is not "
2764 "initialized. Check ipmi config.")
2765
2766 def blackhole_kill_osd(self, osd):
2767 """
2768 Stop osd if nothing else works.
2769 """
11fdf7f2
TL
2770 self.inject_args('osd', osd,
2771 'objectstore-blackhole', True)
7c673cae
FG
2772 time.sleep(2)
2773 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2774
3efd9988 2775 def revive_osd(self, osd, timeout=360, skip_admin_check=False):
7c673cae
FG
2776 """
2777 Revive osds by either power cycling (if indicated by the config)
2778 or by restarting.
2779 """
2780 if self.config.get('powercycle'):
2781 remote = self.find_remote('osd', osd)
2782 self.log('kill_osd on osd.{o} doing powercycle of {s}'.
2783 format(o=osd, s=remote.name))
2784 self._assert_ipmi(remote)
2785 remote.console.power_on()
2786 if not remote.console.check_status(300):
2787 raise Exception('Failed to revive osd.{o} via ipmi'.
2788 format(o=osd))
2789 teuthology.reconnect(self.ctx, 60, [remote])
2790 mount_osd_data(self.ctx, remote, self.cluster, str(osd))
2791 self.make_admin_daemon_dir(remote)
2792 self.ctx.daemons.get_daemon('osd', osd, self.cluster).reset()
2793 self.ctx.daemons.get_daemon('osd', osd, self.cluster).restart()
2794
2795 if not skip_admin_check:
2796 # wait for dump_ops_in_flight; this command doesn't appear
2797 # until after the signal handler is installed and it is safe
2798 # to stop the osd again without making valgrind leak checks
2799 # unhappy. see #5924.
2800 self.wait_run_admin_socket('osd', osd,
2801 args=['dump_ops_in_flight'],
2802 timeout=timeout, stdout=DEVNULL)
2803
2804 def mark_down_osd(self, osd):
2805 """
2806 Cluster command wrapper
2807 """
2808 self.raw_cluster_cmd('osd', 'down', str(osd))
2809
2810 def mark_in_osd(self, osd):
2811 """
2812 Cluster command wrapper
2813 """
2814 self.raw_cluster_cmd('osd', 'in', str(osd))
2815
2816 def signal_osd(self, osd, sig, silent=False):
2817 """
2818 Wrapper to local get_daemon call which sends the given
2819 signal to the given osd.
2820 """
2821 self.ctx.daemons.get_daemon('osd', osd,
2822 self.cluster).signal(sig, silent=silent)
2823
2824 ## monitors
2825 def signal_mon(self, mon, sig, silent=False):
2826 """
11fdf7f2 2827 Wrapper to local get_daemon call
7c673cae
FG
2828 """
2829 self.ctx.daemons.get_daemon('mon', mon,
2830 self.cluster).signal(sig, silent=silent)
2831
2832 def kill_mon(self, mon):
2833 """
2834 Kill the monitor by either power cycling (if the config says so),
2835 or by doing a stop.
2836 """
2837 if self.config.get('powercycle'):
2838 remote = self.find_remote('mon', mon)
2839 self.log('kill_mon on mon.{m} doing powercycle of {s}'.
2840 format(m=mon, s=remote.name))
2841 self._assert_ipmi(remote)
2842 remote.console.power_off()
2843 else:
2844 self.ctx.daemons.get_daemon('mon', mon, self.cluster).stop()
2845
2846 def revive_mon(self, mon):
2847 """
2848 Restart by either power cycling (if the config says so),
2849 or by doing a normal restart.
2850 """
2851 if self.config.get('powercycle'):
2852 remote = self.find_remote('mon', mon)
2853 self.log('revive_mon on mon.{m} doing powercycle of {s}'.
2854 format(m=mon, s=remote.name))
2855 self._assert_ipmi(remote)
2856 remote.console.power_on()
2857 self.make_admin_daemon_dir(remote)
2858 self.ctx.daemons.get_daemon('mon', mon, self.cluster).restart()
2859
31f18b77
FG
2860 def revive_mgr(self, mgr):
2861 """
2862 Restart by either power cycling (if the config says so),
2863 or by doing a normal restart.
2864 """
2865 if self.config.get('powercycle'):
2866 remote = self.find_remote('mgr', mgr)
2867 self.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
2868 format(m=mgr, s=remote.name))
2869 self._assert_ipmi(remote)
2870 remote.console.power_on()
2871 self.make_admin_daemon_dir(remote)
2872 self.ctx.daemons.get_daemon('mgr', mgr, self.cluster).restart()
2873
7c673cae
FG
2874 def get_mon_status(self, mon):
2875 """
2876 Extract all the monitor status information from the cluster
2877 """
9f95a23c 2878 out = self.raw_cluster_cmd('tell', 'mon.%s' % mon, 'mon_status')
7c673cae
FG
2879 return json.loads(out)
2880
2881 def get_mon_quorum(self):
2882 """
2883 Extract monitor quorum information from the cluster
2884 """
2885 out = self.raw_cluster_cmd('quorum_status')
2886 j = json.loads(out)
2887 self.log('quorum_status is %s' % out)
2888 return j['quorum']
2889
2890 def wait_for_mon_quorum_size(self, size, timeout=300):
2891 """
2892 Loop until quorum size is reached.
2893 """
2894 self.log('waiting for quorum size %d' % size)
2895 start = time.time()
2896 while not len(self.get_mon_quorum()) == size:
2897 if timeout is not None:
2898 assert time.time() - start < timeout, \
2899 ('failed to reach quorum size %d '
2900 'before timeout expired' % size)
2901 time.sleep(3)
2902 self.log("quorum is size %d" % size)
2903
2904 def get_mon_health(self, debug=False):
2905 """
2906 Extract all the monitor health information.
2907 """
2908 out = self.raw_cluster_cmd('health', '--format=json')
2909 if debug:
2910 self.log('health:\n{h}'.format(h=out))
2911 return json.loads(out)
2912
9f95a23c
TL
2913 def wait_until_healthy(self, timeout=None):
2914 self.log("wait_until_healthy")
2915 start = time.time()
2916 while self.get_mon_health()['status'] != 'HEALTH_OK':
2917 if timeout is not None:
2918 assert time.time() - start < timeout, \
2919 'timeout expired in wait_until_healthy'
2920 time.sleep(3)
2921 self.log("wait_until_healthy done")
2922
7c673cae
FG
2923 def get_filepath(self):
2924 """
2925 Return path to osd data with {id} needing to be replaced
2926 """
2927 return '/var/lib/ceph/osd/' + self.cluster + '-{id}'
2928
2929 def make_admin_daemon_dir(self, remote):
2930 """
2931 Create /var/run/ceph directory on remote site.
2932
2933 :param ctx: Context
2934 :param remote: Remote site
2935 """
2936 remote.run(args=['sudo',
2937 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
2938
9f95a23c
TL
2939 def get_service_task_status(self, service, status_key):
2940 """
2941 Return daemon task status for a given ceph service.
2942
2943 :param service: ceph service (mds, osd, etc...)
2944 :param status_key: matching task status key
2945 """
2946 task_status = {}
2947 status = self.raw_cluster_status()
2948 try:
2949 for k,v in status['servicemap']['services'][service]['daemons'].items():
2950 ts = dict(v).get('task_status', None)
2951 if ts:
2952 task_status[k] = ts[status_key]
2953 except KeyError: # catches missing service and status key
2954 return {}
2955 self.log(task_status)
2956 return task_status
7c673cae
FG
2957
2958def utility_task(name):
2959 """
2960 Generate ceph_manager subtask corresponding to ceph_manager
2961 method name
2962 """
2963 def task(ctx, config):
2964 if config is None:
2965 config = {}
2966 args = config.get('args', [])
2967 kwargs = config.get('kwargs', {})
2968 cluster = config.get('cluster', 'ceph')
2969 fn = getattr(ctx.managers[cluster], name)
2970 fn(*args, **kwargs)
2971 return task
2972
2973revive_osd = utility_task("revive_osd")
2974revive_mon = utility_task("revive_mon")
2975kill_osd = utility_task("kill_osd")
2976kill_mon = utility_task("kill_mon")
2977create_pool = utility_task("create_pool")
2978remove_pool = utility_task("remove_pool")
2979wait_for_clean = utility_task("wait_for_clean")
c07f9fc5 2980flush_all_pg_stats = utility_task("flush_all_pg_stats")
7c673cae
FG
2981set_pool_property = utility_task("set_pool_property")
2982do_pg_scrub = utility_task("do_pg_scrub")
c07f9fc5
FG
2983wait_for_pool = utility_task("wait_for_pool")
2984wait_for_pools = utility_task("wait_for_pools")