]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/rook/module.py
import 15.2.0 Octopus source
[ceph.git] / ceph / src / pybind / mgr / rook / module.py
index 4b49d68100e96906c0b7fe1565b0c2591566f8a8..cabc5080a790a08c7cc14cb1b2fcf9d083a6b7cd 100644 (file)
@@ -1,9 +1,14 @@
+import datetime
 import threading
 import functools
 import os
-import uuid
+
+from ceph.deployment import inventory
+from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec
+
 try:
-    from typing import List
+    from typing import List, Dict, Optional, Callable, Any
+    from ceph.deployment.drive_group import DriveGroupSpec
 except ImportError:
     pass  # just for type checking
 
@@ -12,6 +17,13 @@ try:
     from kubernetes.client.rest import ApiException
 
     kubernetes_imported = True
+
+    # https://github.com/kubernetes-client/python/issues/895
+    from kubernetes.client.models.v1_container_image import V1ContainerImage
+    def names(self, names):
+        self._names = names
+    V1ContainerImage.names = V1ContainerImage.names.setter(names)
+
 except ImportError:
     kubernetes_imported = False
     client = None
@@ -23,108 +35,13 @@ import orchestrator
 from .rook_cluster import RookCluster
 
 
-all_completions = []
-
-
-class RookReadCompletion(orchestrator.ReadCompletion):
-    """
-    All reads are simply API calls: avoid spawning
-    huge numbers of threads by just running them
-    inline when someone calls wait()
-    """
-
-    def __init__(self, cb):
-        super(RookReadCompletion, self).__init__()
-        self.cb = cb
-        self._result = None
-        self._complete = False
-
-        self.message = "<read op>"
-
-        # XXX hacky global
-        global all_completions
-        all_completions.append(self)
-
-    @property
-    def result(self):
-        return self._result
-
-    @property
-    def is_complete(self):
-        return self._complete
-
-    def execute(self):
-        self._result = self.cb()
-        self._complete = True
-
-
-class RookWriteCompletion(orchestrator.WriteCompletion):
-    """
-    Writes are a two-phase thing, firstly sending
-    the write to the k8s API (fast) and then waiting
-    for the corresponding change to appear in the
-    Ceph cluster (slow)
-    """
-    # XXX kubernetes bindings call_api already usefully has
-    # a completion= param that uses threads.  Maybe just
-    # use that?
-    def __init__(self, execute_cb, complete_cb, message):
-        super(RookWriteCompletion, self).__init__()
-        self.execute_cb = execute_cb
-        self.complete_cb = complete_cb
-
-        # Executed means I executed my k8s API call, it may or may
-        # not have succeeded
-        self.executed = False
-
-        # Result of k8s API call, this is set if executed==True
-        self._result = None
-
-        self.effective = False
-
-        self.id = str(uuid.uuid4())
-
-        self.message = message
-
-        self.error = None
-
-        # XXX hacky global
-        global all_completions
-        all_completions.append(self)
-
-    def __str__(self):
-        return self.message
-
-    @property
-    def result(self):
-        return self._result
-
-    @property
-    def is_persistent(self):
-        return (not self.is_errored) and self.executed
-
-    @property
-    def is_effective(self):
-        return self.effective
-
-    @property
-    def is_errored(self):
-        return self.error is not None
-
-    def execute(self):
-        if not self.executed:
-            self._result = self.execute_cb()
-            self.executed = True
-
-        if not self.effective:
-            # TODO: check self.result for API errors
-            if self.complete_cb is None:
-                self.effective = True
-            else:
-                self.effective = self.complete_cb()
+class RookCompletion(orchestrator.Completion):
+    def evaluate(self):
+        self.finalize(None)
 
 
 def deferred_read(f):
+    # type: (Callable) -> Callable[..., RookCompletion]
     """
     Decorator to make RookOrchestrator methods return
     a completion object that executes themselves.
@@ -132,11 +49,25 @@ def deferred_read(f):
 
     @functools.wraps(f)
     def wrapper(*args, **kwargs):
-        return RookReadCompletion(lambda: f(*args, **kwargs))
+        return RookCompletion(on_complete=lambda _: f(*args, **kwargs))
 
     return wrapper
 
 
+def write_completion(on_complete,  # type: Callable
+                     message,  # type: str
+                     mgr,
+                     calc_percent=None  # type: Optional[Callable[[], RookCompletion]]
+                     ):
+    # type: (...) -> RookCompletion
+    return RookCompletion.with_progress(
+        message=message,
+        mgr=mgr,
+        on_complete=lambda _: on_complete(),
+        calc_percent=calc_percent,
+    )
+
+
 class RookEnv(object):
     def __init__(self):
         # POD_NAMESPACE already exist for Rook 0.9
@@ -145,7 +76,7 @@ class RookEnv(object):
         # ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
         self.cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', self.namespace)
 
-        self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', "rook-ceph-system")
+        self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', self.namespace)
         self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
         self.api_name = "ceph.rook.io/" + self.crd_version
 
@@ -157,42 +88,27 @@ class RookEnv(object):
 
 
 class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
-    MODULE_OPTIONS = [
-        # TODO: configure k8s API addr instead of assuming local
-    ]
-
-    def wait(self, completions):
-        self.log.info("wait: completions={0}".format(completions))
-
-        incomplete = False
+    """
+    Writes are a two-phase thing, firstly sending
+    the write to the k8s API (fast) and then waiting
+    for the corresponding change to appear in the
+    Ceph cluster (slow)
 
-        # Our `wait` implementation is very simple because everything's
-        # just an API call.
-        for c in completions:
-            if not isinstance(c, RookReadCompletion) and \
-                    not isinstance(c, RookWriteCompletion):
-                raise TypeError(
-                    "wait() requires list of completions, not {0}".format(
-                        c.__class__
-                    ))
+    Right now, wre calling the k8s API synchronously.
+    """
 
-            if c.is_complete:
-                continue
+    MODULE_OPTIONS = [
+        # TODO: configure k8s API addr instead of assuming local
+    ]  # type: List[Dict[str, Any]]
 
-            try:
-                c.execute()
-            except Exception as e:
-                if not isinstance(e, orchestrator.OrchestratorError):
-                    self.log.exception("Completion {0} threw an exception:".format(
-                        c.message
-                    ))
-                c.exception = e
-                c._complete = True
+    def process(self, completions):
+        # type: (List[RookCompletion]) -> None
 
-            if not c.is_complete:
-                incomplete = True
+        if completions:
+            self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions)))
 
-        return not incomplete
+            for p in completions:
+                p.evaluate()
 
     @staticmethod
     def can_run():
@@ -225,18 +141,23 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         self._shutdown = threading.Event()
 
+        self.all_progress_references = list()  # type: List[orchestrator.ProgressReference]
+
     def shutdown(self):
         self._shutdown.set()
 
     @property
     def k8s(self):
+        # type: () -> client.CoreV1Api
         self._initialized.wait()
+        assert self._k8s is not None
         return self._k8s
 
     @property
     def rook_cluster(self):
         # type: () -> RookCluster
         self._initialized.wait()
+        assert self._rook_cluster is not None
         return self._rook_cluster
 
     def serve(self):
@@ -244,7 +165,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
         # a Rook cluster.  For development convenience, also support
         # running outside (reading ~/.kube config)
 
-        if self._rook_env.cluster_name:
+        if self._rook_env.has_namespace():
             config.load_incluster_config()
             cluster_name = self._rook_env.cluster_name
         else:
@@ -280,166 +201,329 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
             # in case we had a caller that wait()'ed on them long enough
             # to get persistence but not long enough to get completion
 
-            global all_completions
-            self.wait(all_completions)
-            all_completions = [c for c in all_completions if not c.is_complete]
+            self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
+            for p in self.all_progress_references:
+                p.update()
 
             self._shutdown.wait(5)
 
-        # TODO: watch Rook for config changes to complain/update if
-        # things look a bit out of sync?
+    def cancel_completions(self):
+        for p in self.all_progress_references:
+            p.fail()
+        self.all_progress_references.clear()
 
     @deferred_read
-    def get_inventory(self, node_filter=None, refresh=False):
-        node_list = None
-        if node_filter and node_filter.nodes:
-            # Explicit node list
-            node_list = node_filter.nodes
-        elif node_filter and node_filter.labels:
-            # TODO: query k8s API to resolve to node list, and pass
+    def get_inventory(self, host_filter=None, refresh=False):
+        host_list = None
+        if host_filter and host_filter.hosts:
+            # Explicit host list
+            host_list = host_filter.hosts
+        elif host_filter and host_filter.labels:
+            # TODO: query k8s API to resolve to host list, and pass
             # it into RookCluster.get_discovered_devices
             raise NotImplementedError()
 
-        devs = self.rook_cluster.get_discovered_devices(node_list)
+        devs = self.rook_cluster.get_discovered_devices(host_list)
 
         result = []
-        for node_name, node_devs in devs.items():
+        for host_name, host_devs in devs.items():
             devs = []
-            for d in node_devs:
-                dev = orchestrator.InventoryDevice()
-
-                # XXX CAUTION!  https://github.com/rook/rook/issues/1716
-                # Passing this through for the sake of completeness but it
-                # is not trustworthy!
-                dev.blank = d['empty']
-                dev.type = 'hdd' if d['rotational'] else 'ssd'
-                dev.id = d['name']
-                dev.size = d['size']
-
-                if d['filesystem'] == "" and not d['rotational']:
-                    # Empty or partitioned SSD
-                    partitioned_space = sum(
-                        [p['size'] for p in d['Partitions']])
-                    dev.metadata_space_free = max(0, d[
-                        'size'] - partitioned_space)
-
+            for d in host_devs:
+                dev = inventory.Device(
+                    path='/dev/' + d['name'],
+                    sys_api=dict(
+                        rotational='1' if d['rotational'] else '0',
+                        size=d['size']
+                    ),
+                    available=d['empty'],
+                    rejected_reasons=[] if d['empty'] else ['not empty'],
+                )
                 devs.append(dev)
 
-            result.append(orchestrator.InventoryNode(node_name, devs))
+            result.append(orchestrator.InventoryHost(host_name, inventory.Devices(devs)))
 
         return result
 
     @deferred_read
-    def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False):
-
-        if service_type not in ("mds", "osd", "mgr", "mon", "nfs", None):
-            raise orchestrator.OrchestratorValidationError(service_type + " unsupported")
+    def get_hosts(self):
+        # type: () -> List[orchestrator.HostSpec]
+        return [orchestrator.HostSpec(n) for n in self.rook_cluster.get_node_names()]
 
-        pods = self.rook_cluster.describe_pods(service_type, service_id, node_name)
+    @deferred_read
+    def describe_service(self, service_type=None, service_name=None,
+                         refresh=False):
+        now = datetime.datetime.utcnow()
+
+        # CephCluster
+        cl = self.rook_cluster.rook_api_get(
+            "cephclusters/{0}".format(self.rook_cluster.rook_env.cluster_name))
+        self.log.debug('CephCluster %s' % cl)
+        image_name = cl['spec'].get('cephVersion', {}).get('image', None)
+        num_nodes = len(self.rook_cluster.get_node_names())
+
+        spec = {}
+        spec['mon'] = orchestrator.ServiceDescription(
+            service_name='mon',
+            spec=ServiceSpec(
+                'mon',
+                placement=PlacementSpec(
+                    count=cl['spec'].get('mon', {}).get('count', 1),
+                ),
+            ),
+            size=cl['spec'].get('mon', {}).get('count', 1),
+            container_image_name=image_name,
+            last_refresh=now,
+        )
+        spec['mgr'] = orchestrator.ServiceDescription(
+            service_name='mgr',
+            spec=ServiceSpec(
+                'mgr',
+                placement=PlacementSpec.from_string('count:1'),
+            ),
+            size=1,
+            container_image_name=image_name,
+            last_refresh=now,
+        )
+        if not cl['spec'].get('crashCollector', {}).get('disable', False):
+            spec['crash'] = orchestrator.ServiceDescription(
+                service_name='crash',
+                spec=ServiceSpec(
+                    'crash',
+                    placement=PlacementSpec.from_string('*'),
+                ),
+                size=num_nodes,
+                container_image_name=image_name,
+                last_refresh=now,
+            )
+
+        # CephFilesystems
+        all_fs = self.rook_cluster.rook_api_get(
+            "cephfilesystems/")
+        self.log.debug('CephFilesystems %s' % all_fs)
+        for fs in all_fs.get('items', []):
+            svc = 'mds.' + fs['metadata']['name']
+            if svc in spec:
+                continue
+            # FIXME: we are conflating active (+ standby) with count
+            active = fs['spec'].get('metadataServer', {}).get('activeCount', 1)
+            total_mds = active
+            if fs['spec'].get('metadataServer', {}).get('activeStandby', False):
+                total_mds = active * 2
+            spec[svc] = orchestrator.ServiceDescription(
+                service_name=svc,
+                spec=ServiceSpec(
+                    svc,
+                    placement=PlacementSpec(count=active),
+                ),
+                size=total_mds,
+                container_image_name=image_name,
+                last_refresh=now,
+            )
+
+        # CephObjectstores
+        all_zones = self.rook_cluster.rook_api_get(
+            "cephobjectstores/")
+        self.log.debug('CephObjectstores %s' % all_zones)
+        for zone in all_zones.get('items', []):
+            rgw_realm = zone['metadata']['name']
+            rgw_zone = rgw_realm
+            svc = 'rgw.' + rgw_realm + '.' + rgw_zone
+            if svc in spec:
+                continue
+            active = zone['spec']['gateway']['instances'];
+            if 'securePort' in zone['spec']['gateway']:
+                ssl = True
+                port = zone['spec']['gateway']['securePort']
+            else:
+                ssl = False
+                port = zone['spec']['gateway']['port'] or 80
+            spec[svc] = orchestrator.ServiceDescription(
+                service_name=svc,
+                spec=RGWSpec(
+                    rgw_realm=rgw_realm,
+                    rgw_zone=rgw_zone,
+                    ssl=ssl,
+                    rgw_frontend_port=port,
+                    placement=PlacementSpec(count=active),
+                ),
+                size=active,
+                container_image_name=image_name,
+                last_refresh=now,
+            )
+
+        for dd in self._list_daemons():
+            if dd.service_name() not in spec:
+                continue
+            spec[dd.service_name()].running += 1
+        return [v for k, v in spec.items()]
 
+    @deferred_read
+    def list_daemons(self, daemon_type=None, daemon_id=None, host=None,
+                     refresh=False):
+        return self._list_daemons(daemon_type, daemon_id, host, refresh)
+
+    def _list_daemons(self, daemon_type=None, daemon_id=None, host=None,
+                      refresh=False):
+        pods = self.rook_cluster.describe_pods(daemon_type, daemon_id, host)
+        self.log.debug('pods %s' % pods)
         result = []
         for p in pods:
-            sd = orchestrator.ServiceDescription()
-            sd.nodename = p['nodename']
+            sd = orchestrator.DaemonDescription()
+            sd.hostname = p['hostname']
             sd.container_id = p['name']
-            sd.service_type = p['labels']['app'].replace('rook-ceph-', '')
-
-            if sd.service_type == "osd":
-                sd.service_instance = "%s" % p['labels']["ceph-osd-id"]
-            elif sd.service_type == "mds":
-                sd.service = p['labels']['rook_file_system']
-                pfx = "{0}-".format(sd.service)
-                sd.service_instance = p['labels']['ceph_daemon_id'].replace(pfx, '', 1)
-            elif sd.service_type == "mon":
-                sd.service_instance = p['labels']["mon"]
-            elif sd.service_type == "mgr":
-                sd.service_instance = p['labels']["mgr"]
-            elif sd.service_type == "nfs":
-                sd.service = p['labels']['ceph_nfs']
-                sd.service_instance = p['labels']['instance']
-                sd.rados_config_location = self.rook_cluster.get_nfs_conf_url(sd.service, sd.service_instance)
-            elif sd.service_type == "rgw":
-                sd.service = p['labels']['rgw']
-                sd.service_instance = p['labels']['ceph_daemon_id']
+            sd.daemon_type = p['labels']['app'].replace('rook-ceph-', '')
+            status = {
+                'Pending': -1,
+                'Running': 1,
+                'Succeeded': 0,
+                'Failed': -1,
+                'Unknown': -1,
+            }[p['phase']]
+            sd.status = status
+            sd.status_desc = p['phase']
+
+            if 'ceph_daemon_id' in p['labels']:
+                sd.daemon_id = p['labels']['ceph_daemon_id']
+            elif 'ceph-osd-id' in p['labels']:
+                sd.daemon_id = p['labels']['ceph-osd-id']
             else:
                 # Unknown type -- skip it
                 continue
 
+            sd.container_image_name = p['container_image_name']
+
+            sd.created = p['created']
+            sd.last_configured = p['created']
+            sd.last_deployed = p['created']
+            sd.started = p['started']
+            sd.last_refresh = p['refreshed']
             result.append(sd)
 
         return result
 
     def _service_add_decorate(self, typename, spec, func):
-        return RookWriteCompletion(lambda: func(spec), None,
-                    "Creating {0} services for {1}".format(typename, spec.name))
-
-    def add_stateless_service(self, service_type, spec):
-        # assert isinstance(spec, orchestrator.StatelessServiceSpec)
-        if service_type == "mds":
-            return self._service_add_decorate("Filesystem", spec,
-                                         self.rook_cluster.add_filesystem)
-        elif service_type == "rgw" :
-            return self._service_add_decorate("RGW", spec,
-                                         self.rook_cluster.add_objectstore)
-        elif service_type == "nfs" :
-            return self._service_add_decorate("NFS", spec,
-                                         self.rook_cluster.add_nfsgw)
-        else:
-            raise NotImplementedError(service_type)
-
-    def remove_stateless_service(self, service_type, service_id):
-        return RookWriteCompletion(
-            lambda: self.rook_cluster.rm_service(service_type, service_id), None,
-            "Removing {0} services for {1}".format(service_type, service_id))
-
-    def update_mons(self, num, hosts):
-        if hosts:
-            raise RuntimeError("Host list is not supported by rook.")
-
-        return RookWriteCompletion(
-            lambda: self.rook_cluster.update_mon_count(num), None,
-            "Updating mon count to {0}".format(num))
-
-    def update_stateless_service(self, svc_type, spec):
-        # only nfs is currently supported
-        if svc_type != "nfs":
-            raise NotImplementedError(svc_type)
-
-        num = spec.count
-        return RookWriteCompletion(
-            lambda: self.rook_cluster.update_nfs_count(spec.name, num), None,
-                "Updating NFS server count in {0} to {1}".format(spec.name, num))
-
-    def create_osds(self, drive_group, all_hosts):
-        # type: (orchestrator.DriveGroupSpec, List[str]) -> RookWriteCompletion
-
-        assert len(drive_group.hosts(all_hosts)) == 1
-        targets = []
-        if drive_group.data_devices:
-            targets += drive_group.data_devices.paths
+        return write_completion(
+            on_complete=lambda : func(spec),
+            message="Creating {} services for {}".format(typename, spec.service_id),
+            mgr=self
+        )
+
+    def add_nfs(self, spec):
+        # type: (NFSServiceSpec) -> RookCompletion
+        return self._service_add_decorate("NFS", spec,
+                                          self.rook_cluster.add_nfsgw)
+
+    def _service_rm_decorate(self, typename, name, func):
+        return write_completion(
+            on_complete=lambda : func(name),
+            message="Removing {} services for {}".format(typename, name),
+            mgr=self
+        )
+
+    def remove_service(self, service_type, service_name):
+        if service_type == 'mds':
+            return self._service_rm_decorate(
+                'MDS', service_name, lambda: self.rook_cluster.rm_service(
+                    'cephfilesystems', service_name)
+            )
+        elif service_type == 'rgw':
+            return self._service_rm_decorate(
+                'RGW', service_name, lambda: self.rook_cluster.rm_service('cephobjectstores', service_name)
+            )
+        elif service_type == 'nfs':
+            return self._service_rm_decorate(
+                'NFS', service_name, lambda: self.rook_cluster.rm_service('cephnfses', service_name)
+            )
+
+    def apply_mon(self, spec):
+        # type: (ServiceSpec) -> RookCompletion
+        if spec.placement.hosts or spec.placement.label:
+            raise RuntimeError("Host list or label is not supported by rook.")
+
+        return write_completion(
+            lambda: self.rook_cluster.update_mon_count(spec.placement.count),
+            "Updating mon count to {0}".format(spec.placement.count),
+            mgr=self
+        )
+
+    def apply_mds(self, spec):
+        # type: (ServiceSpec) -> RookCompletion
+        return self._service_add_decorate('MDS', spec,
+                                          self.rook_cluster.apply_filesystem)
+
+    def apply_rgw(self, spec):
+        # type: (RGWSpec) -> RookCompletion
+        return self._service_add_decorate('RGW', spec,
+                                          self.rook_cluster.apply_objectstore)
+
+
+    def apply_nfs(self, spec):
+        # type: (NFSServiceSpec) -> RookCompletion
+        num = spec.placement.count
+        return write_completion(
+            lambda: self.rook_cluster.update_nfs_count(spec.service_id, num),
+            "Updating NFS server count in {0} to {1}".format(spec.service_id, num),
+            mgr=self
+        )
+
+    def remove_daemons(self, names):
+        return write_completion(
+            lambda: self.rook_cluster.remove_pods(names),
+            "Removing daemons {}".format(','.join(names)),
+            mgr=self
+        )
+
+    def create_osds(self, drive_group):
+        # type: (DriveGroupSpec) -> RookCompletion
+        """ Creates OSDs from a drive group specification.
+
+        $: ceph orch osd create -i <dg.file>
+
+        The drivegroup file must only contain one spec at a time.
+        """
+
+        targets = []  # type: List[str]
+        if drive_group.data_devices and drive_group.data_devices.paths:
+            targets += [d.path for d in drive_group.data_devices.paths]
         if drive_group.data_directories:
             targets += drive_group.data_directories
 
-        if not self.rook_cluster.node_exists(drive_group.hosts(all_hosts)[0]):
-            raise RuntimeError("Node '{0}' is not in the Kubernetes "
-                               "cluster".format(drive_group.hosts(all_hosts)))
+        def execute(all_hosts_):
+            # type: (List[orchestrator.HostSpec]) -> orchestrator.Completion
+            all_hosts = [h.hostname for h in all_hosts_]
+            matching_hosts = drive_group.placement.pattern_matches_hosts(all_hosts)
+
+            assert len(matching_hosts) == 1
+
+            if not self.rook_cluster.node_exists(matching_hosts[0]):
+                raise RuntimeError("Node '{0}' is not in the Kubernetes "
+                                   "cluster".format(matching_hosts))
+
+            # Validate whether cluster CRD can accept individual OSD
+            # creations (i.e. not useAllDevices)
+            if not self.rook_cluster.can_create_osd():
+                raise RuntimeError("Rook cluster configuration does not "
+                                   "support OSD creation.")
 
-        # Validate whether cluster CRD can accept individual OSD
-        # creations (i.e. not useAllDevices)
-        if not self.rook_cluster.can_create_osd():
-            raise RuntimeError("Rook cluster configuration does not "
-                               "support OSD creation.")
+            return orchestrator.Completion.with_progress(
+                message="Creating OSD on {0}:{1}".format(
+                        matching_hosts,
+                        targets),
+                mgr=self,
+                on_complete=lambda _:self.rook_cluster.add_osds(drive_group, all_hosts),
+                calc_percent=lambda: has_osds(all_hosts)
+            )
 
-        def execute():
-            return self.rook_cluster.add_osds(drive_group, all_hosts)
+        @deferred_read
+        def has_osds(all_hosts):
+            matching_hosts = drive_group.placement.pattern_matches_hosts(all_hosts)
 
-        def is_complete():
             # Find OSD pods on this host
             pod_osd_ids = set()
-            pods = self._k8s.list_namespaced_pod(self._rook_env.namespace,
+            pods = self.k8s.list_namespaced_pod(self._rook_env.namespace,
                                                  label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
                                                  field_selector="spec.nodeName={0}".format(
-                                                     drive_group.hosts(all_hosts)[0]
+                                                     matching_hosts[0]
                                                  )).items
             for p in pods:
                 pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
@@ -463,7 +547,5 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
 
             return found is not None
 
-        return RookWriteCompletion(execute, is_complete,
-                                   "Creating OSD on {0}:{1}".format(
-                                       drive_group.hosts(all_hosts)[0], targets
-                                   ))
+        c = self.get_hosts().then(execute)
+        return c