]> git.proxmox.com Git - ceph.git/blobdiff - ceph/qa/tasks/ceph_manager.py
import 15.2.4
[ceph.git] / ceph / qa / tasks / ceph_manager.py
index edbb2ae3ec6384c6812e8fa7255f6aca6c84a324..c058735aa8a27d9aee0b0a5869e62ee0f37f1e30 100644 (file)
@@ -1,7 +1,6 @@
 """
 ceph manager -- Thrasher and CephManager objects
 """
-from cStringIO import StringIO
 from functools import wraps
 import contextlib
 import random
@@ -14,40 +13,61 @@ import logging
 import threading
 import traceback
 import os
+import six
+
+from io import BytesIO
+from six import StringIO
 from teuthology import misc as teuthology
 from tasks.scrub import Scrubber
-from util.rados import cmd_erasure_code_profile
-from util import get_remote
+from tasks.util.rados import cmd_erasure_code_profile
+from tasks.util import get_remote
 from teuthology.contextutil import safe_while
 from teuthology.orchestra.remote import Remote
 from teuthology.orchestra import run
 from teuthology.exceptions import CommandFailedError
+from tasks.thrasher import Thrasher
+from six import StringIO
 
 try:
     from subprocess import DEVNULL # py3k
 except ImportError:
-    DEVNULL = open(os.devnull, 'r+')
+    DEVNULL = open(os.devnull, 'r+')  # type: ignore
 
 DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
 
 log = logging.getLogger(__name__)
 
+# this is for cephadm clusters
+def shell(ctx, cluster_name, remote, args, name=None, **kwargs):
+    testdir = teuthology.get_testdir(ctx)
+    extra_args = []
+    if name:
+        extra_args = ['-n', name]
+    return remote.run(
+        args=[
+            'sudo',
+            ctx.cephadm,
+            '--image', ctx.ceph[cluster_name].image,
+            'shell',
+        ] + extra_args + [
+            '--fsid', ctx.ceph[cluster_name].fsid,
+            '--',
+        ] + args,
+        **kwargs
+    )
 
 def write_conf(ctx, conf_path=DEFAULT_CONF_PATH, cluster='ceph'):
-    conf_fp = StringIO()
+    conf_fp = BytesIO()
     ctx.ceph[cluster].conf.write(conf_fp)
     conf_fp.seek(0)
     writes = ctx.cluster.run(
         args=[
             'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
             'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
-            'sudo', 'python',
-            '-c',
-            ('import shutil, sys; '
-             'shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))'),
-            conf_path,
-            run.Raw('&&'),
+            'sudo', 'tee', conf_path, run.Raw('&&'),
             'sudo', 'chmod', '0644', conf_path,
+            run.Raw('>'), '/dev/null',
+
         ],
         stdin=run.PIPE,
         wait=False)
@@ -95,11 +115,29 @@ def mount_osd_data(ctx, remote, cluster, osd):
             )
 
 
-class Thrasher:
+def log_exc(func):
+    @wraps(func)
+    def wrapper(self):
+        try:
+            return func(self)
+        except:
+            self.log(traceback.format_exc())
+            raise
+    return wrapper
+
+
+class PoolType:
+    REPLICATED = 1
+    ERASURE_CODED = 3
+
+
+class OSDThrasher(Thrasher):
     """
     Object used to thrash Ceph
     """
-    def __init__(self, manager, config, logger=None):
+    def __init__(self, manager, config, name, logger):
+        super(OSDThrasher, self).__init__()
+
         self.ceph_manager = manager
         self.cluster = manager.cluster
         self.ceph_manager.wait_for_clean()
@@ -111,6 +149,7 @@ class Thrasher:
         self.stopping = False
         self.logger = logger
         self.config = config
+        self.name = name
         self.revive_timeout = self.config.get("revive_timeout", 360)
         self.pools_to_fix_pgp_num = set()
         if self.config.get('powercycle'):
@@ -129,16 +168,8 @@ class Thrasher:
         self.chance_force_recovery = self.config.get('chance_force_recovery', 0.3)
 
         num_osds = self.in_osds + self.out_osds
-        self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * num_osds
-        if self.logger is not None:
-            self.log = lambda x: self.logger.info(x)
-        else:
-            def tmp(x):
-                """
-                Implement log behavior
-                """
-                print x
-            self.log = tmp
+        self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * len(num_osds)
+        self.min_pgs = self.config.get("min_pgs_per_pool_osd", 1) * len(num_osds)
         if self.config is None:
             self.config = dict()
         # prevent monitor from auto-marking things out while thrasher runs
@@ -148,19 +179,19 @@ class Thrasher:
         # another
         first_mon = teuthology.get_first_mon(manager.ctx, self.config).split('.')
         opts = [('mon', 'mon_osd_down_out_interval', 0)]
+        #why do we disable marking an OSD out automatically? :/
         for service, opt, new_value in opts:
             old_value = manager.get_config(first_mon[0],
                                            first_mon[1],
                                            opt)
             self.saved_options.append((service, opt, old_value))
-            self._set_config(service, '*', opt, new_value)
+            manager.inject_args(service, '*', opt, new_value)
         # initialize ceph_objectstore_tool property - must be done before
         # do_thrash is spawned - http://tracker.ceph.com/issues/18799
         if (self.config.get('powercycle') or
             not self.cmd_exists_on_osds("ceph-objectstore-tool") or
             self.config.get('disable_objectstore_tool_tests', False)):
             self.ceph_objectstore_tool = False
-            self.test_rm_past_intervals = False
             if self.config.get('powercycle'):
                 self.log("Unable to test ceph-objectstore-tool, "
                          "powercycle testing")
@@ -170,8 +201,6 @@ class Thrasher:
         else:
             self.ceph_objectstore_tool = \
                 self.config.get('ceph_objectstore_tool', True)
-            self.test_rm_past_intervals = \
-                self.config.get('test_rm_past_intervals', True)
         # spawn do_thrash
         self.thread = gevent.spawn(self.do_thrash)
         if self.sighup_delay:
@@ -183,25 +212,39 @@ class Thrasher:
         if self.noscrub_toggle_delay:
             self.noscrub_toggle_thread = gevent.spawn(self.do_noscrub_toggle)
 
-    def _set_config(self, service_type, service_id, name, value):
-        opt_arg = '--{name} {value}'.format(name=name, value=value)
-        whom = '.'.join([service_type, service_id])
-        self.ceph_manager.raw_cluster_cmd('--', 'tell', whom,
-                                          'injectargs', opt_arg)
-
+    def log(self, msg, *args, **kwargs):
+        self.logger.info(msg, *args, **kwargs)
 
     def cmd_exists_on_osds(self, cmd):
+        if self.ceph_manager.cephadm:
+            return True
         allremotes = self.ceph_manager.ctx.cluster.only(\
             teuthology.is_type('osd', self.cluster)).remotes.keys()
         allremotes = list(set(allremotes))
         for remote in allremotes:
             proc = remote.run(args=['type', cmd], wait=True,
-                              check_status=False, stdout=StringIO(),
-                              stderr=StringIO())
+                              check_status=False, stdout=BytesIO(),
+                              stderr=BytesIO())
             if proc.exitstatus != 0:
                 return False;
         return True;
 
+    def run_ceph_objectstore_tool(self, remote, osd, cmd):
+        if self.ceph_manager.cephadm:
+            return shell(
+                self.ceph_manager.ctx, self.ceph_manager.cluster, remote,
+                args=['ceph-objectstore-tool'] + cmd,
+                name=osd,
+                wait=True, check_status=False,
+                stdout=StringIO(),
+                stderr=StringIO())
+        else:
+            return remote.run(
+                args=['sudo', 'adjust-ulimits', 'ceph-objectstore-tool'] + cmd,
+                wait=True, check_status=False,
+                stdout=StringIO(),
+                stderr=StringIO())
+
     def kill_osd(self, osd=None, mark_down=False, mark_out=False):
         """
         :param osd: Osd to be killed.
@@ -220,84 +263,103 @@ class Thrasher:
         if mark_out and osd in self.in_osds:
             self.out_osd(osd)
         if self.ceph_objectstore_tool:
-            self.log("Testing ceph-objectstore-tool on down osd")
+            self.log("Testing ceph-objectstore-tool on down osd.%s" % osd)
             remote = self.ceph_manager.find_remote('osd', osd)
             FSPATH = self.ceph_manager.get_filepath()
             JPATH = os.path.join(FSPATH, "journal")
             exp_osd = imp_osd = osd
+            self.log('remote for osd %s is %s' % (osd, remote))
             exp_remote = imp_remote = remote
             # If an older osd is available we'll move a pg from there
             if (len(self.dead_osds) > 1 and
                     random.random() < self.chance_move_pg):
                 exp_osd = random.choice(self.dead_osds[:-1])
                 exp_remote = self.ceph_manager.find_remote('osd', exp_osd)
-            if ('keyvaluestore_backend' in
-                    self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
-                prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
-                          "--data-path {fpath} --journal-path {jpath} "
-                          "--type keyvaluestore "
-                          "--log-file="
-                          "/var/log/ceph/objectstore_tool.\\$pid.log ".
-                          format(fpath=FSPATH, jpath=JPATH))
-            else:
-                prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
-                          "--data-path {fpath} --journal-path {jpath} "
-                          "--log-file="
-                          "/var/log/ceph/objectstore_tool.\\$pid.log ".
-                          format(fpath=FSPATH, jpath=JPATH))
-            cmd = (prefix + "--op list-pgs").format(id=exp_osd)
-
-            # ceph-objectstore-tool might be temporarily absent during an 
-            # upgrade - see http://tracker.ceph.com/issues/18014
-            with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
-                while proceed():
-                    proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'], 
-                               wait=True, check_status=False, stdout=StringIO(),
-                               stderr=StringIO())
-                    if proc.exitstatus == 0:
-                        break
-                    log.debug("ceph-objectstore-tool binary not present, trying again")
+                self.log('remote for exp osd %s is %s' % (exp_osd, exp_remote))
+            prefix = [
+                '--no-mon-config',
+                '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
+            ]
+
+            if not self.ceph_manager.cephadm:
+                # ceph-objectstore-tool might be temporarily absent during an
+                # upgrade - see http://tracker.ceph.com/issues/18014
+                with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
+                    while proceed():
+                        proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'],
+                                              wait=True, check_status=False, stdout=BytesIO(),
+                                              stderr=BytesIO())
+                        if proc.exitstatus == 0:
+                            break
+                        log.debug("ceph-objectstore-tool binary not present, trying again")
 
             # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
             # see http://tracker.ceph.com/issues/19556
             with safe_while(sleep=15, tries=40, action="ceph-objectstore-tool --op list-pgs") as proceed:
                 while proceed():
-                    proc = exp_remote.run(args=cmd, wait=True,
-                                          check_status=False,
-                                          stdout=StringIO(), stderr=StringIO())
+                    proc = self.run_ceph_objectstore_tool(
+                        exp_remote, 'osd.%s' % exp_osd,
+                        prefix + [
+                            '--data-path', FSPATH.format(id=exp_osd),
+                            '--journal-path', JPATH.format(id=exp_osd),
+                            '--op', 'list-pgs',
+                        ])
                     if proc.exitstatus == 0:
                         break
-                    elif proc.exitstatus == 1 and proc.stderr == "OSD has the store locked":
+                    elif (proc.exitstatus == 1 and
+                          proc.stderr.getvalue() == "OSD has the store locked"):
                         continue
                     else:
                         raise Exception("ceph-objectstore-tool: "
                                         "exp list-pgs failure with status {ret}".
                                         format(ret=proc.exitstatus))
 
-            pgs = proc.stdout.getvalue().split('\n')[:-1]
+            pgs = six.ensure_str(proc.stdout.getvalue()).split('\n')[:-1]
             if len(pgs) == 0:
                 self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
                 return
             pg = random.choice(pgs)
-            exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
-            exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
-            exp_path = os.path.join(exp_path,
+            #exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
+            #exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
+            exp_path = os.path.join('/var/log/ceph', # available inside 'shell' container
                                     "exp.{pg}.{id}".format(
                                         pg=pg,
                                         id=exp_osd))
+            if self.ceph_manager.cephadm:
+                exp_host_path = os.path.join(
+                    '/var/log/ceph',
+                    self.ceph_manager.ctx.ceph[self.ceph_manager.cluster].fsid,
+                    "exp.{pg}.{id}".format(
+                        pg=pg,
+                        id=exp_osd))
+            else:
+                exp_host_path = exp_path
+
             # export
             # Can't use new export-remove op since this is part of upgrade testing
-            cmd = prefix + "--op export --pgid {pg} --file {file}"
-            cmd = cmd.format(id=exp_osd, pg=pg, file=exp_path)
-            proc = exp_remote.run(args=cmd)
+            proc = self.run_ceph_objectstore_tool(
+                exp_remote, 'osd.%s' % exp_osd,
+                prefix + [
+                    '--data-path', FSPATH.format(id=exp_osd),
+                    '--journal-path', JPATH.format(id=exp_osd),
+                    '--op', 'export',
+                    '--pgid', pg,
+                    '--file', exp_path,
+                ])
             if proc.exitstatus:
                 raise Exception("ceph-objectstore-tool: "
                                 "export failure with status {ret}".
                                 format(ret=proc.exitstatus))
             # remove
-            cmd = prefix + "--force --op remove --pgid {pg}"
-            cmd = cmd.format(id=exp_osd, pg=pg)
-            proc = exp_remote.run(args=cmd)
+            proc = self.run_ceph_objectstore_tool(
+                exp_remote, 'osd.%s' % exp_osd,
+                prefix + [
+                    '--data-path', FSPATH.format(id=exp_osd),
+                    '--journal-path', JPATH.format(id=exp_osd),
+                    '--force',
+                    '--op', 'remove',
+                    '--pgid', pg,
+                ])
             if proc.exitstatus:
                 raise Exception("ceph-objectstore-tool: "
                                 "remove failure with status {ret}".
@@ -305,14 +367,19 @@ class Thrasher:
             # If there are at least 2 dead osds we might move the pg
             if exp_osd != imp_osd:
                 # If pg isn't already on this osd, then we will move it there
-                cmd = (prefix + "--op list-pgs").format(id=imp_osd)
-                proc = imp_remote.run(args=cmd, wait=True,
-                                      check_status=False, stdout=StringIO())
+                proc = self.run_ceph_objectstore_tool(
+                    imp_remote,
+                    'osd.%s' % imp_osd,
+                    prefix + [
+                        '--data-path', FSPATH.format(id=imp_osd),
+                        '--journal-path', JPATH.format(id=imp_osd),
+                        '--op', 'list-pgs',
+                    ])
                 if proc.exitstatus:
                     raise Exception("ceph-objectstore-tool: "
                                     "imp list-pgs failure with status {ret}".
                                     format(ret=proc.exitstatus))
-                pgs = proc.stdout.getvalue().split('\n')[:-1]
+                pgs = six.ensure_str(proc.stdout.getvalue()).split('\n')[:-1]
                 if pg not in pgs:
                     self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
                              format(pg=pg, fosd=exp_osd, tosd=imp_osd))
@@ -320,18 +387,36 @@ class Thrasher:
                         # Copy export file to the other machine
                         self.log("Transfer export file from {srem} to {trem}".
                                  format(srem=exp_remote, trem=imp_remote))
-                        tmpexport = Remote.get_file(exp_remote, exp_path)
-                        Remote.put_file(imp_remote, tmpexport, exp_path)
+                        # just in case an upgrade make /var/log/ceph unreadable by non-root,
+                        exp_remote.run(args=['sudo', 'chmod', '777',
+                                             '/var/log/ceph'])
+                        imp_remote.run(args=['sudo', 'chmod', '777',
+                                             '/var/log/ceph'])
+                        tmpexport = Remote.get_file(exp_remote, exp_host_path,
+                                                    sudo=True)
+                        if exp_host_path != exp_path:
+                            # push to /var/log/ceph, then rename (we can't
+                            # chmod 777 the /var/log/ceph/$fsid mountpoint)
+                            Remote.put_file(imp_remote, tmpexport, exp_path)
+                            imp_remote.run(args=[
+                                'sudo', 'mv', exp_path, exp_host_path])
+                        else:
+                            Remote.put_file(imp_remote, tmpexport, exp_host_path)
                         os.remove(tmpexport)
                 else:
                     # Can't move the pg after all
                     imp_osd = exp_osd
                     imp_remote = exp_remote
             # import
-            cmd = (prefix + "--op import --file {file}")
-            cmd = cmd.format(id=imp_osd, file=exp_path)
-            proc = imp_remote.run(args=cmd, wait=True, check_status=False,
-                                  stderr=StringIO())
+            proc = self.run_ceph_objectstore_tool(
+                imp_remote, 'osd.%s' % imp_osd,
+                [
+                    '--data-path', FSPATH.format(id=imp_osd),
+                    '--journal-path', JPATH.format(id=imp_osd),
+                    '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
+                    '--op', 'import',
+                    '--file', exp_path,
+                ])
             if proc.exitstatus == 1:
                 bogosity = "The OSD you are using is older than the exported PG"
                 if bogosity in proc.stderr.getvalue():
@@ -343,73 +428,41 @@ class Thrasher:
             elif proc.exitstatus == 11:
                 self.log("Attempt to import an incompatible export"
                          "...ignored")
+            elif proc.exitstatus == 12:
+                # this should be safe to ignore because we only ever move 1
+                # copy of the pg at a time, and merge is only initiated when
+                # all replicas are peered and happy.  /me crosses fingers
+                self.log("PG merged on target"
+                         "...ignored")
             elif proc.exitstatus:
                 raise Exception("ceph-objectstore-tool: "
                                 "import failure with status {ret}".
                                 format(ret=proc.exitstatus))
-            cmd = "rm -f {file}".format(file=exp_path)
+            cmd = "sudo rm -f {file}".format(file=exp_host_path)
             exp_remote.run(args=cmd)
             if imp_remote != exp_remote:
                 imp_remote.run(args=cmd)
 
             # apply low split settings to each pool
-            for pool in self.ceph_manager.list_pools():
-                no_sudo_prefix = prefix[5:]
-                cmd = ("CEPH_ARGS='--filestore-merge-threshold 1 "
-                       "--filestore-split-multiple 1' sudo -E "
-                       + no_sudo_prefix + "--op apply-layout-settings --pool " + pool).format(id=osd)
-                proc = remote.run(args=cmd, wait=True, check_status=False, stderr=StringIO())
-                output = proc.stderr.getvalue()
-                if 'Couldn\'t find pool' in output:
-                    continue
-                if proc.exitstatus:
-                    raise Exception("ceph-objectstore-tool apply-layout-settings"
-                                    " failed with {status}".format(status=proc.exitstatus))
+            if not self.ceph_manager.cephadm:
+                for pool in self.ceph_manager.list_pools():
+                    cmd = ("CEPH_ARGS='--filestore-merge-threshold 1 "
+                           "--filestore-split-multiple 1' sudo -E "
+                           + 'ceph-objectstore-tool '
+                           + ' '.join(prefix + [
+                               '--data-path', FSPATH.format(id=imp_osd),
+                               '--journal-path', JPATH.format(id=imp_osd),
+                           ])
+                           + " --op apply-layout-settings --pool " + pool).format(id=osd)
+                    proc = imp_remote.run(args=cmd,
+                                          wait=True, check_status=False,
+                                          stderr=StringIO())
+                    if 'Couldn\'t find pool' in proc.stderr.getvalue():
+                        continue
+                    if proc.exitstatus:
+                        raise Exception("ceph-objectstore-tool apply-layout-settings"
+                                        " failed with {status}".format(status=proc.exitstatus))
 
-    def rm_past_intervals(self, osd=None):
-        """
-        :param osd: Osd to find pg to remove past intervals
-        """
-        if self.test_rm_past_intervals:
-            if osd is None:
-                osd = random.choice(self.dead_osds)
-            self.log("Use ceph_objectstore_tool to remove past intervals")
-            remote = self.ceph_manager.find_remote('osd', osd)
-            FSPATH = self.ceph_manager.get_filepath()
-            JPATH = os.path.join(FSPATH, "journal")
-            if ('keyvaluestore_backend' in
-                    self.ceph_manager.ctx.ceph[self.cluster].conf['osd']):
-                prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
-                          "--data-path {fpath} --journal-path {jpath} "
-                          "--type keyvaluestore "
-                          "--log-file="
-                          "/var/log/ceph/objectstore_tool.\\$pid.log ".
-                          format(fpath=FSPATH, jpath=JPATH))
-            else:
-                prefix = ("sudo adjust-ulimits ceph-objectstore-tool "
-                          "--data-path {fpath} --journal-path {jpath} "
-                          "--log-file="
-                          "/var/log/ceph/objectstore_tool.\\$pid.log ".
-                          format(fpath=FSPATH, jpath=JPATH))
-            cmd = (prefix + "--op list-pgs").format(id=osd)
-            proc = remote.run(args=cmd, wait=True,
-                              check_status=False, stdout=StringIO())
-            if proc.exitstatus:
-                raise Exception("ceph_objectstore_tool: "
-                                "exp list-pgs failure with status {ret}".
-                                format(ret=proc.exitstatus))
-            pgs = proc.stdout.getvalue().split('\n')[:-1]
-            if len(pgs) == 0:
-                self.log("No PGs found for osd.{osd}".format(osd=osd))
-                return
-            pg = random.choice(pgs)
-            cmd = (prefix + "--op rm-past-intervals --pgid {pg}").\
-                format(id=osd, pg=pg)
-            proc = remote.run(args=cmd)
-            if proc.exitstatus:
-                raise Exception("ceph_objectstore_tool: "
-                                "rm-past-intervals failure with status {ret}".
-                                format(ret=proc.exitstatus))
 
     def blackhole_kill_osd(self, osd=None):
         """
@@ -438,11 +491,11 @@ class Thrasher:
             skip_admin_check=skip_admin_check)
         self.dead_osds.remove(osd)
         self.live_osds.append(osd)
-        if self.random_eio > 0 and osd is self.rerrosd:
-            self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
-                          'injectargs', '--', '--filestore_debug_random_read_err='+str(self.random_eio))
-            self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
-                          'injectargs', '--', '--bluestore_debug_random_read_err='+str(self.random_eio))
+        if self.random_eio > 0 and osd == self.rerrosd:
+            self.ceph_manager.set_config(self.rerrosd,
+                                         filestore_debug_random_read_err = self.random_eio)
+            self.ceph_manager.set_config(self.rerrosd,
+                                         bluestore_debug_random_read_err = self.random_eio)
 
 
     def out_osd(self, osd=None):
@@ -536,6 +589,8 @@ class Thrasher:
         try:
             if random.random() >= .3:
                 pgs = self.ceph_manager.get_pg_stats()
+                if not pgs:
+                    return
                 pg = random.choice(pgs)
                 pgid = str(pg['pgid'])
                 poolid = int(pgid.split('.')[0])
@@ -576,6 +631,8 @@ class Thrasher:
         try:
             if random.random() >= .3:
                 pgs = self.ceph_manager.get_pg_stats()
+                if not pgs:
+                    return
                 pg = random.choice(pgs)
                 pgid = str(pg['pgid'])
                 poolid = int(pgid.split('.')[0])
@@ -691,19 +748,37 @@ class Thrasher:
         Increase the size of the pool
         """
         pool = self.ceph_manager.get_pool()
-        orig_pg_num = self.ceph_manager.get_pool_pg_num(pool)
+        if pool is None:
+            return
         self.log("Growing pool %s" % (pool,))
         if self.ceph_manager.expand_pool(pool,
                                          self.config.get('pool_grow_by', 10),
                                          self.max_pgs):
             self.pools_to_fix_pgp_num.add(pool)
 
+    def shrink_pool(self):
+        """
+        Decrease the size of the pool
+        """
+        pool = self.ceph_manager.get_pool()
+        if pool is None:
+            return
+        _ = self.ceph_manager.get_pool_pg_num(pool)
+        self.log("Shrinking pool %s" % (pool,))
+        if self.ceph_manager.contract_pool(
+                pool,
+                self.config.get('pool_shrink_by', 10),
+                self.min_pgs):
+            self.pools_to_fix_pgp_num.add(pool)
+
     def fix_pgp_num(self, pool=None):
         """
         Fix number of pgs in pool.
         """
         if pool is None:
             pool = self.ceph_manager.get_pool()
+            if not pool:
+                return
             force = False
         else:
             force = True
@@ -713,27 +788,110 @@ class Thrasher:
 
     def test_pool_min_size(self):
         """
-        Kill and revive all osds except one.
+        Loop to selectively push PGs below their min_size and test that recovery
+        still occurs.
         """
         self.log("test_pool_min_size")
         self.all_up()
         self.ceph_manager.wait_for_recovery(
             timeout=self.config.get('timeout')
             )
-        the_one = random.choice(self.in_osds)
-        self.log("Killing everyone but %s", the_one)
-        to_kill = filter(lambda x: x != the_one, self.in_osds)
-        [self.kill_osd(i) for i in to_kill]
-        [self.out_osd(i) for i in to_kill]
-        time.sleep(self.config.get("test_pool_min_size_time", 10))
-        self.log("Killing %s" % (the_one,))
-        self.kill_osd(the_one)
-        self.out_osd(the_one)
-        self.log("Reviving everyone but %s" % (the_one,))
-        [self.revive_osd(i) for i in to_kill]
-        [self.in_osd(i) for i in to_kill]
-        self.log("Revived everyone but %s" % (the_one,))
-        self.log("Waiting for clean")
+
+        minout = int(self.config.get("min_out", 1))
+        minlive = int(self.config.get("min_live", 2))
+        mindead = int(self.config.get("min_dead", 1))
+        self.log("doing min_size thrashing")
+        self.ceph_manager.wait_for_clean(timeout=60)
+        assert self.ceph_manager.is_clean(), \
+            'not clean before minsize thrashing starts'
+        while not self.stopping:
+            # look up k and m from all the pools on each loop, in case it
+            # changes as the cluster runs
+            k = 0
+            m = 99
+            has_pools = False
+            pools_json = self.ceph_manager.get_osd_dump_json()['pools']
+
+            for pool_json in pools_json:
+                pool = pool_json['pool_name']
+                has_pools = True
+                pool_type = pool_json['type']  # 1 for rep, 3 for ec
+                min_size = pool_json['min_size']
+                self.log("pool {pool} min_size is {min_size}".format(pool=pool,min_size=min_size))
+                try:
+                    ec_profile = self.ceph_manager.get_pool_property(pool, 'erasure_code_profile')
+                    if pool_type != PoolType.ERASURE_CODED:
+                        continue
+                    ec_profile = pool_json['erasure_code_profile']
+                    ec_profile_json = self.ceph_manager.raw_cluster_cmd(
+                        'osd',
+                        'erasure-code-profile',
+                        'get',
+                        ec_profile,
+                        '--format=json')
+                    ec_json = json.loads(ec_profile_json)
+                    local_k = int(ec_json['k'])
+                    local_m = int(ec_json['m'])
+                    self.log("pool {pool} local_k={k} local_m={m}".format(pool=pool,
+                                                                          k=local_k, m=local_m))
+                    if local_k > k:
+                        self.log("setting k={local_k} from previous {k}".format(local_k=local_k, k=k))
+                        k = local_k
+                    if local_m < m:
+                        self.log("setting m={local_m} from previous {m}".format(local_m=local_m, m=m))
+                        m = local_m
+                except CommandFailedError:
+                    self.log("failed to read erasure_code_profile. %s was likely removed", pool)
+                    continue
+
+            if has_pools :
+                self.log("using k={k}, m={m}".format(k=k,m=m))
+            else:
+                self.log("No pools yet, waiting")
+                time.sleep(5)
+                continue
+                
+            if minout > len(self.out_osds): # kill OSDs and mark out
+                self.log("forced to out an osd")
+                self.kill_osd(mark_out=True)
+                continue
+            elif mindead > len(self.dead_osds): # kill OSDs but force timeout
+                self.log("forced to kill an osd")
+                self.kill_osd()
+                continue
+            else: # make mostly-random choice to kill or revive OSDs
+                minup = max(minlive, k)
+                rand_val = random.uniform(0, 1)
+                self.log("choosing based on number of live OSDs and rand val {rand}".\
+                         format(rand=rand_val))
+                if len(self.live_osds) > minup+1 and rand_val < 0.5:
+                    # chose to knock out as many OSDs as we can w/out downing PGs
+                    
+                    most_killable = min(len(self.live_osds) - minup, m)
+                    self.log("chose to kill {n} OSDs".format(n=most_killable))
+                    for i in range(1, most_killable):
+                        self.kill_osd(mark_out=True)
+                    time.sleep(10)
+                    # try a few times since there might be a concurrent pool
+                    # creation or deletion
+                    with safe_while(
+                            sleep=5, tries=5,
+                            action='check for active or peered') as proceed:
+                        while proceed():
+                            if self.ceph_manager.all_active_or_peered():
+                                break
+                            self.log('not all PGs are active or peered')
+                else: # chose to revive OSDs, bring up a random fraction of the dead ones
+                    self.log("chose to revive osds")
+                    for i in range(1, int(rand_val * len(self.dead_osds))):
+                        self.revive_osd(i)
+
+            # let PGs repair themselves or our next knockout might kill one
+            self.ceph_manager.wait_for_clean(timeout=self.config.get('timeout'))
+        # / while not self.stopping
+        self.all_up_in()
         self.ceph_manager.wait_for_recovery(
             timeout=self.config.get('timeout')
             )
@@ -841,15 +999,15 @@ class Thrasher:
         Random action selector.
         """
         chance_down = self.config.get('chance_down', 0.4)
-        chance_test_min_size = self.config.get('chance_test_min_size', 0)
+        _ = self.config.get('chance_test_min_size', 0)
         chance_test_backfill_full = \
             self.config.get('chance_test_backfill_full', 0)
         if isinstance(chance_down, int):
             chance_down = float(chance_down) / 100
         minin = self.minin
-        minout = self.config.get("min_out", 0)
-        minlive = self.config.get("min_live", 2)
-        mindead = self.config.get("min_dead", 0)
+        minout = int(self.config.get("min_out", 0))
+        minlive = int(self.config.get("min_live", 2))
+        mindead = int(self.config.get("min_dead", 0))
 
         self.log('choose_action: min_in %d min_out '
                  '%d min_live %d min_dead %d' %
@@ -859,8 +1017,6 @@ class Thrasher:
             actions.append((self.out_osd, 1.0,))
         if len(self.live_osds) > minlive and chance_down > 0:
             actions.append((self.kill_osd, chance_down,))
-        if len(self.dead_osds) > 1:
-            actions.append((self.rm_past_intervals, 1.0,))
         if len(self.out_osds) > minout:
             actions.append((self.in_osd, 1.7,))
         if len(self.dead_osds) > mindead:
@@ -871,10 +1027,12 @@ class Thrasher:
                         self.config.get('reweight_osd', .5),))
         actions.append((self.grow_pool,
                         self.config.get('chance_pgnum_grow', 0),))
+        actions.append((self.shrink_pool,
+                        self.config.get('chance_pgnum_shrink', 0),))
         actions.append((self.fix_pgp_num,
                         self.config.get('chance_pgpnum_fix', 0),))
         actions.append((self.test_pool_min_size,
-                        chance_test_min_size,))
+                        self.config.get('chance_test_min_size', 0),))
         actions.append((self.test_backfill_full,
                         chance_test_backfill_full,))
         if self.chance_thrash_cluster_full > 0:
@@ -910,15 +1068,18 @@ class Thrasher:
             val -= prob
         return None
 
-    def log_exc(func):
-        @wraps(func)
-        def wrapper(self):
-            try:
-                return func(self)
-            except:
-                self.log(traceback.format_exc())
-                raise
-        return wrapper
+    def do_thrash(self):
+        """
+        _do_thrash() wrapper.
+        """
+        try:
+            self._do_thrash()
+        except Exception as e:
+            # See _run exception comment for MDSThrasher
+            self.set_thrasher_exception(e)
+            self.logger.exception("exception:")
+            # Allow successful completion so gevent doesn't see an exception.
+            # The DaemonWatchdog will observe the error and tear down the test.
 
     @log_exc
     def do_sighup(self):
@@ -949,8 +1110,12 @@ class Thrasher:
                 osd_state = "false"
             else:
                 osd_state = "true"
-            self.ceph_manager.raw_cluster_cmd_result('tell', 'osd.*',
-                             'injectargs', '--osd_enable_op_tracker=%s' % osd_state)
+            try:
+                self.ceph_manager.inject_args('osd', '*',
+                                              'osd_enable_op_tracker',
+                                              osd_state)
+            except CommandFailedError:
+                self.log('Failed to tell all osds, ignoring')
             gevent.sleep(delay)
 
     @log_exc
@@ -998,7 +1163,7 @@ class Thrasher:
         self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
 
     @log_exc
-    def do_thrash(self):
+    def _do_thrash(self):
         """
         Loop to select random actions to thrash ceph manager with.
         """
@@ -1008,10 +1173,12 @@ class Thrasher:
         delay = self.config.get("op_delay", 5)
         self.rerrosd = self.live_osds[0]
         if self.random_eio > 0:
-            self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
-                          'injectargs', '--', '--filestore_debug_random_read_err='+str(self.random_eio))
-            self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
-                          'injectargs', '--', '--bluestore_debug_random_read_err='+str(self.random_eio))
+            self.ceph_manager.inject_args('osd', self.rerrosd,
+                                          'filestore_debug_random_read_err',
+                                          self.random_eio)
+            self.ceph_manager.inject_args('osd', self.rerrosd,
+                                          'bluestore_debug_random_read_err',
+                                          self.random_eio)
         self.log("starting do_thrash")
         while not self.stopping:
             to_log = [str(x) for x in ["in_osds: ", self.in_osds,
@@ -1026,7 +1193,8 @@ class Thrasher:
                     self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
                                                       str(osd), str(1))
                 if random.uniform(0, 1) < float(
-                        self.config.get('chance_test_map_discontinuity', 0)):
+                        self.config.get('chance_test_map_discontinuity', 0)) \
+                        and len(self.live_osds) > 5: # avoid m=2,k=2 stall, w/ some buffer for crush being picky
                     self.test_map_discontinuity()
                 else:
                     self.ceph_manager.wait_for_recovery(
@@ -1041,16 +1209,16 @@ class Thrasher:
             time.sleep(delay)
         self.all_up()
         if self.random_eio > 0:
-            self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
-                          'injectargs', '--', '--filestore_debug_random_read_err=0.0')
-            self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
-                          'injectargs', '--', '--bluestore_debug_random_read_err=0.0')
+            self.ceph_manager.inject_args('osd', self.rerrosd,
+                                          'filestore_debug_random_read_err', '0.0')
+            self.ceph_manager.inject_args('osd', self.rerrosd,
+                                          'bluestore_debug_random_read_err', '0.0')
         for pool in list(self.pools_to_fix_pgp_num):
             if self.ceph_manager.get_pool_pg_num(pool) > 0:
                 self.fix_pgp_num(pool)
         self.pools_to_fix_pgp_num.clear()
         for service, opt, saved_value in self.saved_options:
-            self._set_config(service, '*', opt, saved_value)
+            self.ceph_manager.inject_args(service, '*', opt, saved_value)
         self.saved_options = []
         self.all_up_in()
 
@@ -1072,8 +1240,8 @@ class ObjectStoreTool:
             self.pgid = self.manager.get_object_pg_with_shard(self.pool,
                                                               self.object_name,
                                                               self.osd)
-        self.remote = self.manager.ctx.\
-            cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()[0]
+        self.remote = next(iter(self.manager.ctx.\
+            cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()))
         path = self.manager.get_filepath().format(id=self.osd)
         self.paths = ("--data-path {path} --journal-path {path}/journal".
                       format(path=path))
@@ -1102,7 +1270,7 @@ class ObjectStoreTool:
 
     def run(self, options, args, stdin=None, stdout=None):
         if stdout is None:
-            stdout = StringIO()
+            stdout = BytesIO()
         self.manager.kill_osd(self.osd)
         cmd = self.build_cmd(options, args, stdin)
         self.manager.log(cmd)
@@ -1110,11 +1278,12 @@ class ObjectStoreTool:
             proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
                                    check_status=False,
                                    stdout=stdout,
-                                   stderr=StringIO())
+                                   stderr=BytesIO())
             proc.wait()
             if proc.exitstatus != 0:
                 self.manager.log("failed with " + str(proc.exitstatus))
-                error = proc.stdout.getvalue() + " " + proc.stderr.getvalue()
+                error = six.ensure_str(proc.stdout.getvalue())  + " " + \
+                        six.ensure_str(proc.stderr.getvalue())
                 raise Exception(error)
         finally:
             if self.do_revive:
@@ -1122,26 +1291,30 @@ class ObjectStoreTool:
                 self.manager.wait_till_osd_is_up(self.osd, 300)
 
 
+# XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
+# the same name.
 class CephManager:
     """
     Ceph manager object.
     Contains several local functions that form a bulk of this module.
 
-    Note: this class has nothing to do with the Ceph daemon (ceph-mgr) of
-    the same name.
+    :param controller: the remote machine where the Ceph commands should be
+                       executed
+    :param ctx: the cluster context
+    :param config: path to Ceph config file
+    :param logger: for logging messages
+    :param cluster: name of the Ceph cluster
     """
 
-    REPLICATED_POOL = 1
-    ERASURE_CODED_POOL = 3
-
     def __init__(self, controller, ctx=None, config=None, logger=None,
-                 cluster='ceph'):
+                 cluster='ceph', cephadm=False):
         self.lock = threading.RLock()
         self.ctx = ctx
         self.config = config
         self.controller = controller
         self.next_pool_id = 0
         self.cluster = cluster
+        self.cephadm = cephadm
         if (logger):
             self.log = lambda x: logger.info(x)
         else:
@@ -1149,7 +1322,7 @@ class CephManager:
                 """
                 implement log behavior.
                 """
-                print x
+                print(x)
             self.log = tmp
         if self.config is None:
             self.config = dict()
@@ -1158,7 +1331,7 @@ class CephManager:
         for pool in pools:
             # we may race with a pool deletion; ignore failures here
             try:
-                self.pools[pool] = self.get_pool_property(pool, 'pg_num')
+                self.pools[pool] = self.get_pool_int_property(pool, 'pg_num')
             except CommandFailedError:
                 self.log('Failed to get pg_num from pool %s, ignoring' % pool)
 
@@ -1166,62 +1339,125 @@ class CephManager:
         """
         Start ceph on a raw cluster.  Return count
         """
-        testdir = teuthology.get_testdir(self.ctx)
-        ceph_args = [
-            'sudo',
-            'adjust-ulimits',
-            'ceph-coverage',
-            '{tdir}/archive/coverage'.format(tdir=testdir),
-            'timeout',
-            '120',
-            'ceph',
-            '--cluster',
-            self.cluster,
-        ]
-        ceph_args.extend(args)
-        proc = self.controller.run(
-            args=ceph_args,
-            stdout=StringIO(),
+        if self.cephadm:
+            proc = shell(self.ctx, self.cluster, self.controller,
+                         args=['ceph'] + list(args),
+                         stdout=BytesIO())
+        else:
+            testdir = teuthology.get_testdir(self.ctx)
+            ceph_args = [
+                'sudo',
+                'adjust-ulimits',
+                'ceph-coverage',
+                '{tdir}/archive/coverage'.format(tdir=testdir),
+                'timeout',
+                '120',
+                'ceph',
+                '--cluster',
+                self.cluster,
+                '--log-early',
+            ]
+            ceph_args.extend(args)
+            proc = self.controller.run(
+                args=ceph_args,
+                stdout=BytesIO(),
             )
-        return proc.stdout.getvalue()
+        return six.ensure_str(proc.stdout.getvalue())
 
-    def raw_cluster_cmd_result(self, *args):
+    def raw_cluster_cmd_result(self, *args, **kwargs):
         """
         Start ceph on a cluster.  Return success or failure information.
         """
-        testdir = teuthology.get_testdir(self.ctx)
-        ceph_args = [
-            'sudo',
-            'adjust-ulimits',
-            'ceph-coverage',
-            '{tdir}/archive/coverage'.format(tdir=testdir),
-            'timeout',
-            '120',
-            'ceph',
-            '--cluster',
-            self.cluster,
-        ]
-        ceph_args.extend(args)
-        proc = self.controller.run(
-            args=ceph_args,
-            check_status=False,
-            )
+        if self.cephadm:
+            proc = shell(self.ctx, self.cluster, self.controller,
+                         args=['ceph'] + list(args),
+                         check_status=False)
+        else:
+            testdir = teuthology.get_testdir(self.ctx)
+            ceph_args = [
+                'sudo',
+                'adjust-ulimits',
+                'ceph-coverage',
+                '{tdir}/archive/coverage'.format(tdir=testdir),
+                'timeout',
+                '120',
+                'ceph',
+                '--cluster',
+                self.cluster,
+            ]
+            ceph_args.extend(args)
+            kwargs['args'] = ceph_args
+            kwargs['check_status'] = False
+            proc = self.controller.run(**kwargs)
         return proc.exitstatus
 
-    def run_ceph_w(self):
+    def run_ceph_w(self, watch_channel=None):
         """
-        Execute "ceph -w" in the background with stdout connected to a StringIO,
+        Execute "ceph -w" in the background with stdout connected to a BytesIO,
         and return the RemoteProcess.
+
+        :param watch_channel: Specifies the channel to be watched. This can be
+                              'cluster', 'audit', ...
+        :type watch_channel: str
+        """
+        args = ["sudo",
+                "daemon-helper",
+                "kill",
+                "ceph",
+                '--cluster',
+                self.cluster,
+                "-w"]
+        if watch_channel is not None:
+            args.append("--watch-channel")
+            args.append(watch_channel)
+        return self.controller.run(args=args, wait=False, stdout=StringIO(), stdin=run.PIPE)
+
+    def get_mon_socks(self):
         """
-        return self.controller.run(
-            args=["sudo",
-                  "daemon-helper",
-                  "kill",
-                  "ceph",
-                  '--cluster',
-                  self.cluster,
-                  "-w"],
-            wait=False, stdout=StringIO(), stdin=run.PIPE)
+        Get monitor sockets.
+
+        :return socks: tuple of strings; strings are individual sockets.
+        """
+        from json import loads
+
+        output = loads(self.raw_cluster_cmd(['--format=json', 'mon', 'dump']))
+        socks = []
+        for mon in output['mons']:
+            for addrvec_mem in mon['public_addrs']['addrvec']:
+                socks.append(addrvec_mem['addr'])
+        return tuple(socks)
+
+    def get_msgrv1_mon_socks(self):
+        """
+        Get monitor sockets that use msgrv1 to operate.
+
+        :return socks: tuple of strings; strings are individual sockets.
+        """
+        from json import loads
+
+        output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
+        socks = []
+        for mon in output['mons']:
+            for addrvec_mem in mon['public_addrs']['addrvec']:
+                if addrvec_mem['type'] == 'v1':
+                    socks.append(addrvec_mem['addr'])
+        return tuple(socks)
+
+    def get_msgrv2_mon_socks(self):
+        """
+        Get monitor sockets that use msgrv2 to operate.
+
+        :return socks: tuple of strings; strings are individual sockets.
+        """
+        from json import loads
+
+        output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
+        socks = []
+        for mon in output['mons']:
+            for addrvec_mem in mon['public_addrs']['addrvec']:
+                if addrvec_mem['type'] == 'v2':
+                    socks.append(addrvec_mem['addr'])
+        return tuple(socks)
 
     def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
         """
@@ -1237,18 +1473,18 @@ class CephManager:
         :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
                              it. (5 min by default)
         """
-        seq = {osd: self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats')
+        seq = {osd: int(self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats'))
                for osd in osds}
         if not wait_for_mon:
             return
         if no_wait is None:
             no_wait = []
-        for osd, need in seq.iteritems():
+        for osd, need in seq.items():
             if osd in no_wait:
                 continue
             got = 0
             while wait_for_mon > 0:
-                got = self.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd)
+                got = int(self.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd))
                 self.log('need seq {need} got {got} for osd.{osd}'.format(
                     need=need, got=got, osd=osd))
                 if got >= need:
@@ -1382,8 +1618,21 @@ class CephManager:
         """
         if stdout is None:
             stdout = StringIO()
-        testdir = teuthology.get_testdir(self.ctx)
+
         remote = self.find_remote(service_type, service_id)
+
+        if self.cephadm:
+            return shell(
+                self.ctx, self.cluster, remote,
+                args=[
+                    'ceph', 'daemon', '%s.%s' % (service_type, service_id),
+                ] + command,
+                stdout=stdout,
+                wait=True,
+                check_status=check_status,
+            )
+
+        testdir = teuthology.get_testdir(self.ctx)
         args = [
             'sudo',
             'adjust-ulimits',
@@ -1434,10 +1683,10 @@ class CephManager:
         assert False
 
     def wait_for_pg_stats(func):
-        # both osd_mon_report_interval_min and mgr_stats_period are 5 seconds
+        # both osd_mon_report_interval and mgr_stats_period are 5 seconds
         # by default, and take the faulty injection in ms into consideration,
         # 12 seconds are more than enough
-        delays = [1, 1, 2, 3, 5, 8, 13]
+        delays = [1, 1, 2, 3, 5, 8, 13, 0]
         @wraps(func)
         def wrapper(self, *args, **kwargs):
             exc = None
@@ -1493,7 +1742,7 @@ class CephManager:
     def wait_run_admin_socket(self, service_type,
                               service_id, args=['version'], timeout=75, stdout=None):
         """
-        If osd_admin_socket call suceeds, return.  Otherwise wait
+        If osd_admin_socket call succeeds, return.  Otherwise wait
         five seconds and try again.
         """
         if stdout is None:
@@ -1502,7 +1751,7 @@ class CephManager:
         while True:
             proc = self.admin_socket(service_type, service_id,
                                      args, check_status=False, stdout=stdout)
-            if proc.exitstatus is 0:
+            if proc.exitstatus == 0:
                 return proc
             else:
                 tries += 1
@@ -1537,12 +1786,19 @@ class CephManager:
         j = json.loads(proc.stdout.getvalue())
         return j[name]
 
+    def inject_args(self, service_type, service_id, name, value):
+        whom = '{0}.{1}'.format(service_type, service_id)
+        if isinstance(value, bool):
+            value = 'true' if value else 'false'
+        opt_arg = '--{name}={value}'.format(name=name, value=value)
+        self.raw_cluster_cmd('--', 'tell', whom, 'injectargs', opt_arg)
+
     def set_config(self, osdnum, **argdict):
         """
         :param osdnum: osd number
         :param argdict: dictionary containing values to set.
         """
-        for k, v in argdict.iteritems():
+        for k, v in argdict.items():
             self.wait_run_admin_socket(
                 'osd', osdnum,
                 ['config', 'set', str(k), str(v)])
@@ -1551,7 +1807,7 @@ class CephManager:
         """
         Get status from cluster
         """
-        status = self.raw_cluster_cmd('status', '--format=json-pretty')
+        status = self.raw_cluster_cmd('status', '--format=json')
         return json.loads(status)
 
     def raw_osd_status(self):
@@ -1564,9 +1820,9 @@ class CephManager:
         """
         Get osd statuses sorted by states that the osds are in.
         """
-        osd_lines = filter(
+        osd_lines = list(filter(
             lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
-            self.raw_osd_status().split('\n'))
+            self.raw_osd_status().split('\n')))
         self.log(osd_lines)
         in_osds = [int(i[4:].split()[0])
                    for i in filter(lambda x: " in " in x, osd_lines)]
@@ -1645,7 +1901,7 @@ class CephManager:
         :param erasure_code_use_overwrites: if true, allow overwrites
         """
         with self.lock:
-            assert isinstance(pool_name, basestring)
+            assert isinstance(pool_name, six.string_types)
             assert isinstance(pg_num, int)
             assert pool_name not in self.pools
             self.log("creating pool_name %s" % (pool_name,))
@@ -1697,27 +1953,27 @@ class CephManager:
         :param pool_name: Pool to be removed
         """
         with self.lock:
-            assert isinstance(pool_name, basestring)
+            assert isinstance(pool_name, six.string_types)
             assert pool_name in self.pools
             self.log("removing pool_name %s" % (pool_name,))
             del self.pools[pool_name]
-            self.do_rados(self.controller,
-                          ['rmpool', pool_name, pool_name,
-                           "--yes-i-really-really-mean-it"])
+            self.raw_cluster_cmd('osd', 'pool', 'rm', pool_name, pool_name,
+                                 "--yes-i-really-really-mean-it")
 
     def get_pool(self):
         """
         Pick a random pool
         """
         with self.lock:
-            return random.choice(self.pools.keys())
+            if self.pools:
+                return random.sample(self.pools.keys(), 1)[0]
 
     def get_pool_pg_num(self, pool_name):
         """
         Return the number of pgs in the pool specified.
         """
         with self.lock:
-            assert isinstance(pool_name, basestring)
+            assert isinstance(pool_name, six.string_types)
             if pool_name in self.pools:
                 return self.pools[pool_name]
             return 0
@@ -1726,18 +1982,21 @@ class CephManager:
         """
         :param pool_name: pool
         :param prop: property to be checked.
-        :returns: property as an int value.
+        :returns: property as string
         """
         with self.lock:
-            assert isinstance(pool_name, basestring)
-            assert isinstance(prop, basestring)
+            assert isinstance(pool_name, six.string_types)
+            assert isinstance(prop, six.string_types)
             output = self.raw_cluster_cmd(
                 'osd',
                 'pool',
                 'get',
                 pool_name,
                 prop)
-            return int(output.split()[1])
+            return output.split()[1]
+
+    def get_pool_int_property(self, pool_name, prop):
+        return int(self.get_pool_property(pool_name, prop))
 
     def set_pool_property(self, pool_name, prop, val):
         """
@@ -1748,8 +2007,8 @@ class CephManager:
         This routine retries if set operation fails.
         """
         with self.lock:
-            assert isinstance(pool_name, basestring)
-            assert isinstance(prop, basestring)
+            assert isinstance(pool_name, six.string_types)
+            assert isinstance(prop, six.string_types)
             assert isinstance(val, int)
             tries = 0
             while True:
@@ -1776,7 +2035,7 @@ class CephManager:
         Increase the number of pgs in a pool
         """
         with self.lock:
-            assert isinstance(pool_name, basestring)
+            assert isinstance(pool_name, six.string_types)
             assert isinstance(by, int)
             assert pool_name in self.pools
             if self.get_num_creating() > 0:
@@ -1789,26 +2048,68 @@ class CephManager:
             self.pools[pool_name] = new_pg_num
             return True
 
+    def contract_pool(self, pool_name, by, min_pgs):
+        """
+        Decrease the number of pgs in a pool
+        """
+        with self.lock:
+            self.log('contract_pool %s by %s min %s' % (
+                     pool_name, str(by), str(min_pgs)))
+            assert isinstance(pool_name, six.string_types)
+            assert isinstance(by, int)
+            assert pool_name in self.pools
+            if self.get_num_creating() > 0:
+                self.log('too many creating')
+                return False
+            proj = self.pools[pool_name] - by
+            if proj < min_pgs:
+                self.log('would drop below min_pgs, proj %d, currently %d' % (proj,self.pools[pool_name],))
+                return False
+            self.log("decrease pool size by %d" % (by,))
+            new_pg_num = self.pools[pool_name] - by
+            self.set_pool_property(pool_name, "pg_num", new_pg_num)
+            self.pools[pool_name] = new_pg_num
+            return True
+
+    def stop_pg_num_changes(self):
+        """
+        Reset all pg_num_targets back to pg_num, canceling splits and merges
+        """
+        self.log('Canceling any pending splits or merges...')
+        osd_dump = self.get_osd_dump_json()
+        try:
+            for pool in osd_dump['pools']:
+                if pool['pg_num'] != pool['pg_num_target']:
+                    self.log('Setting pool %s (%d) pg_num %d -> %d' %
+                             (pool['pool_name'], pool['pool'],
+                              pool['pg_num_target'],
+                              pool['pg_num']))
+                    self.raw_cluster_cmd('osd', 'pool', 'set', pool['pool_name'],
+                                         'pg_num', str(pool['pg_num']))
+        except KeyError:
+            # we don't support pg_num_target before nautilus
+            pass
+
     def set_pool_pgpnum(self, pool_name, force):
         """
         Set pgpnum property of pool_name pool.
         """
         with self.lock:
-            assert isinstance(pool_name, basestring)
+            assert isinstance(pool_name, six.string_types)
             assert pool_name in self.pools
             if not force and self.get_num_creating() > 0:
                 return False
             self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
             return True
 
-    def list_pg_missing(self, pgid):
+    def list_pg_unfound(self, pgid):
         """
-        return list of missing pgs with the id specified
+        return list of unfound pgs with the id specified
         """
         r = None
         offset = {}
         while True:
-            out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_missing',
+            out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_unfound',
                                        json.dumps(offset))
             j = json.loads(out)
             if r is None:
@@ -1830,7 +2131,10 @@ class CephManager:
         """
         out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
         j = json.loads('\n'.join(out.split('\n')[1:]))
-        return j['pg_stats']
+        try:
+            return j['pg_map']['pg_stats']
+        except KeyError:
+            return j['pg_stats']
 
     def get_pgids_to_force(self, backfill):
         """
@@ -1879,13 +2183,13 @@ class CephManager:
                 ret[status] += 1
         return ret
 
-    @wait_for_pg_stats
+    @wait_for_pg_stats # type: ignore
     def with_pg_state(self, pool, pgnum, check):
         pgstr = self.get_pgid(pool, pgnum)
         stats = self.get_single_pg_stats(pgstr)
         assert(check(stats['state']))
 
-    @wait_for_pg_stats
+    @wait_for_pg_stats # type: ignore
     def with_pg(self, pool, pgnum, check):
         pgstr = self.get_pgid(pool, pgnum)
         stats = self.get_single_pg_stats(pgstr)
@@ -1962,7 +2266,7 @@ class CephManager:
         """
         pool_dump = self.get_pool_dump(pool)
         object_map = self.get_object_map(pool, name)
-        if pool_dump["type"] == CephManager.ERASURE_CODED_POOL:
+        if pool_dump["type"] == PoolType.ERASURE_CODED:
             shard = object_map['acting'].index(osdid)
             return "{pgid}s{shard}".format(pgid=object_map['pgid'],
                                            shard=shard)
@@ -1998,6 +2302,14 @@ class CephManager:
         """
         return self.get_osd_dump_json()['osds']
 
+    def get_osd_metadata(self):
+        """
+        osd metadata --format=json converted to a python object
+        :returns: the python object containing osd metadata information
+        """
+        out = self.raw_cluster_cmd('osd', 'metadata', '--format=json')
+        return json.loads('\n'.join(out.split('\n')[1:]))
+
     def get_mgr_dump(self):
         out = self.raw_cluster_cmd('mgr', 'dump', '--format=json')
         return json.loads(out)
@@ -2008,7 +2320,7 @@ class CephManager:
         """
         out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
                                    '--format=json')
-        return json.loads(out)
+        return json.loads(out).get('stuck_pg_stats',[])
 
     def get_num_unfound_objects(self):
         """
@@ -2034,6 +2346,9 @@ class CephManager:
         Find the number of active and clean pgs.
         """
         pgs = self.get_pg_stats()
+        return self._get_num_active_clean(pgs)
+
+    def _get_num_active_clean(self, pgs):
         num = 0
         for pg in pgs:
             if (pg['state'].count('active') and
@@ -2047,6 +2362,9 @@ class CephManager:
         Find the number of active and recovered pgs.
         """
         pgs = self.get_pg_stats()
+        return self._get_num_active_recovered(pgs)
+
+    def _get_num_active_recovered(self, pgs):
         num = 0
         for pg in pgs:
             if (pg['state'].count('active') and
@@ -2072,6 +2390,9 @@ class CephManager:
         Find the number of active pgs.
         """
         pgs = self.get_pg_stats()
+        return self._get_num_active(pgs)
+
+    def _get_num_active(self, pgs):
         num = 0
         for pg in pgs:
             if pg['state'].count('active') and not pg['state'].count('stale'):
@@ -2097,6 +2418,9 @@ class CephManager:
         Find the number of pgs that are either active or down.
         """
         pgs = self.get_pg_stats()
+        return self._get_num_active_down(pgs)
+
+    def _get_num_active_down(self, pgs):
         num = 0
         for pg in pgs:
             if ((pg['state'].count('active') and not
@@ -2108,25 +2432,42 @@ class CephManager:
                 num += 1
         return num
 
+    def get_num_peered(self):
+        """
+        Find the number of PGs that are peered
+        """
+        pgs = self.get_pg_stats()
+        return self._get_num_peered(pgs)
+
+    def _get_num_peered(self, pgs):
+        num = 0
+        for pg in pgs:
+            if pg['state'].count('peered') and not pg['state'].count('stale'):
+                 num += 1
+        return num
+
     def is_clean(self):
         """
         True if all pgs are clean
         """
-        return self.get_num_active_clean() == self.get_num_pgs()
+        pgs = self.get_pg_stats()
+        return self._get_num_active_clean(pgs) == len(pgs)
 
     def is_recovered(self):
         """
         True if all pgs have recovered
         """
-        return self.get_num_active_recovered() == self.get_num_pgs()
+        pgs = self.get_pg_stats()
+        return self._get_num_active_recovered(pgs) == len(pgs)
 
     def is_active_or_down(self):
         """
         True if all pgs are active or down
         """
-        return self.get_num_active_down() == self.get_num_pgs()
+        pgs = self.get_pg_stats()
+        return self._get_num_active_down(pgs) == len(pgs)
 
-    def wait_for_clean(self, timeout=None):
+    def wait_for_clean(self, timeout=1200):
         """
         Returns true when all pgs are clean.
         """
@@ -2225,8 +2566,8 @@ class CephManager:
                 else:
                     self.log("no progress seen, keeping timeout for now")
                     if now - start >= timeout:
-                       if self.is_recovered():
-                           break
+                        if self.is_recovered():
+                            break
                         self.log('dumping pgs')
                         out = self.raw_cluster_cmd('pg', 'dump')
                         self.log(out)
@@ -2310,6 +2651,13 @@ class CephManager:
         """
         return self.get_num_active() == self.get_num_pgs()
 
+    def all_active_or_peered(self):
+        """
+        Wrapper to check if all PGs are active or peered
+        """
+        pgs = self.get_pg_stats()
+        return self._get_num_active(pgs) + self._get_num_peered(pgs) == len(pgs)
+
     def wait_till_active(self, timeout=None):
         """
         Wait until all pgs are active.
@@ -2370,11 +2718,9 @@ class CephManager:
             remote.console.power_off()
         elif self.config.get('bdev_inject_crash') and self.config.get('bdev_inject_crash_probability'):
             if random.uniform(0, 1) < self.config.get('bdev_inject_crash_probability', .5):
-                self.raw_cluster_cmd(
-                    '--', 'tell', 'osd.%d' % osd,
-                    'injectargs',
-                    '--bdev-inject-crash %d' % self.config.get('bdev_inject_crash'),
-                )
+                self.inject_args(
+                    'osd', osd,
+                    'bdev-inject-crash', self.config.get('bdev_inject_crash'))
                 try:
                     self.ctx.daemons.get_daemon('osd', osd, self.cluster).wait()
                 except:
@@ -2396,9 +2742,8 @@ class CephManager:
         """
         Stop osd if nothing else works.
         """
-        self.raw_cluster_cmd('--', 'tell', 'osd.%d' % osd,
-                             'injectargs',
-                             '--objectstore-blackhole')
+        self.inject_args('osd', osd,
+                         'objectstore-blackhole', True)
         time.sleep(2)
         self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
 
@@ -2454,7 +2799,7 @@ class CephManager:
     ## monitors
     def signal_mon(self, mon, sig, silent=False):
         """
-        Wrapper to local get_deamon call
+        Wrapper to local get_daemon call
         """
         self.ctx.daemons.get_daemon('mon', mon,
                                     self.cluster).signal(sig, silent=silent)
@@ -2505,8 +2850,7 @@ class CephManager:
         """
         Extract all the monitor status information from the cluster
         """
-        addr = self.ctx.ceph[self.cluster].conf['mon.%s' % mon]['mon addr']
-        out = self.raw_cluster_cmd('-m', addr, 'mon_status')
+        out = self.raw_cluster_cmd('tell', 'mon.%s' % mon, 'mon_status')
         return json.loads(out)
 
     def get_mon_quorum(self):
@@ -2541,17 +2885,15 @@ class CephManager:
             self.log('health:\n{h}'.format(h=out))
         return json.loads(out)
 
-    def get_mds_status(self, mds):
-        """
-        Run cluster commands for the mds in order to get mds information
-        """
-        out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
-        j = json.loads(' '.join(out.splitlines()[1:]))
-        # collate; for dup ids, larger gid wins.
-        for info in j['info'].itervalues():
-            if info['name'] == mds:
-                return info
-        return None
+    def wait_until_healthy(self, timeout=None):
+        self.log("wait_until_healthy")
+        start = time.time()
+        while self.get_mon_health()['status'] != 'HEALTH_OK':
+            if timeout is not None:
+                assert time.time() - start < timeout, \
+                    'timeout expired in wait_until_healthy'
+            time.sleep(3)
+        self.log("wait_until_healthy done")
 
     def get_filepath(self):
         """
@@ -2569,6 +2911,24 @@ class CephManager:
         remote.run(args=['sudo',
                          'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
 
+    def get_service_task_status(self, service, status_key):
+        """
+        Return daemon task status for a given ceph service.
+
+        :param service: ceph service (mds, osd, etc...)
+        :param status_key: matching task status key
+        """
+        task_status = {}
+        status = self.raw_cluster_status()
+        try:
+            for k,v in status['servicemap']['services'][service]['daemons'].items():
+                ts = dict(v).get('task_status', None)
+                if ts:
+                    task_status[k] = ts[status_key]
+        except KeyError: # catches missing service and status key
+            return {}
+        self.log(task_status)
+        return task_status
 
 def utility_task(name):
     """