]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/rook/rook_cluster.py
import ceph quincy 17.2.6
[ceph.git] / ceph / src / pybind / mgr / rook / rook_cluster.py
index ef4040754e3de85a0f04fcfd71310cdfeba2f7e9..2ac1dcb8a843982be376b601f1fa6fa32b06d5cb 100644 (file)
@@ -6,31 +6,80 @@ call methods.
 
 This module is runnable outside of ceph-mgr, useful for testing.
 """
 
 This module is runnable outside of ceph-mgr, useful for testing.
 """
+import datetime
+import threading
 import logging
 import logging
-import json
 from contextlib import contextmanager
 from contextlib import contextmanager
+from time import sleep
+import re
+from orchestrator import OrchResult
 
 
-from six.moves.urllib.parse import urljoin  # pylint: disable=import-error
+import jsonpatch
+from urllib.parse import urljoin
+import json
 
 # Optional kubernetes imports to enable MgrModule.can_run
 # to behave cleanly.
 
 # Optional kubernetes imports to enable MgrModule.can_run
 # to behave cleanly.
+from urllib3.exceptions import ProtocolError
+
+from ceph.deployment.inventory import Device
+from ceph.deployment.drive_group import DriveGroupSpec
+from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec, HostPlacementSpec
+from ceph.utils import datetime_now
+from ceph.deployment.drive_selection.matchers import SizeMatcher
+from nfs.cluster import create_ganesha_pool
+from nfs.module import Module
+from nfs.export import NFSRados
+from mgr_module import NFS_POOL_NAME
+from mgr_util import merge_dicts
+
+from typing import Optional, Tuple, TypeVar, List, Callable, Any, cast, Generic, \
+    Iterable, Dict, Iterator, Type
+
 try:
 try:
+    from kubernetes import client, watch
     from kubernetes.client.rest import ApiException
 except ImportError:
     from kubernetes.client.rest import ApiException
 except ImportError:
-    ApiException = None
+    class ApiException(Exception):  # type: ignore
+        status = 0
+
+from .rook_client.ceph import cephfilesystem as cfs
+from .rook_client.ceph import cephnfs as cnfs
+from .rook_client.ceph import cephobjectstore as cos
+from .rook_client.ceph import cephcluster as ccl
+from .rook_client.ceph import cephrbdmirror as crbdm
+from .rook_client._helper import CrdClass
+
+import orchestrator
 
 try:
 
 try:
-    import orchestrator
-    from rook.module import RookEnv
-    from typing import List
+    from rook.module import RookEnv, RookOrchestrator
 except ImportError:
     pass  # just used for type checking.
 
 
 except ImportError:
     pass  # just used for type checking.
 
 
+T = TypeVar('T')
+FuncT = TypeVar('FuncT', bound=Callable)
+
+CrdClassT = TypeVar('CrdClassT', bound=CrdClass)
+
+
 log = logging.getLogger(__name__)
 
 
 log = logging.getLogger(__name__)
 
 
-class ApplyException(Exception):
+def __urllib3_supports_read_chunked() -> bool:
+    # There is a bug in CentOS 7 as it ships a urllib3 which is lower
+    # than required by kubernetes-client
+    try:
+        from urllib3.response import HTTPResponse
+        return hasattr(HTTPResponse, 'read_chunked')
+    except ImportError:
+        return False
+
+
+_urllib3_supports_read_chunked = __urllib3_supports_read_chunked()
+
+class ApplyException(orchestrator.OrchestratorError):
     """
     For failures to update the Rook CRDs, usually indicating
     some kind of interference between our attempted update
     """
     For failures to update the Rook CRDs, usually indicating
     some kind of interference between our attempted update
@@ -38,21 +87,639 @@ class ApplyException(Exception):
     """
 
 
     """
 
 
+def threaded(f: Callable[..., None]) -> Callable[..., threading.Thread]:
+    def wrapper(*args: Any, **kwargs: Any) -> threading.Thread:
+        t = threading.Thread(target=f, args=args, kwargs=kwargs)
+        t.start()
+        return t
+
+    return cast(Callable[..., threading.Thread], wrapper)
+
+
+class DefaultFetcher():
+    def __init__(self, storage_class: str, coreV1_api: 'client.CoreV1Api'):
+        self.storage_class = storage_class
+        self.coreV1_api = coreV1_api
+
+    def fetch(self) -> None:
+        self.inventory: KubernetesResource[client.V1PersistentVolumeList] = KubernetesResource(self.coreV1_api.list_persistent_volume)
+        self.pvs_in_sc = [i for i in self.inventory.items if i.spec.storage_class_name == self.storage_class]
+
+    def convert_size(self, size_str: str) -> int:
+        units = ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "", "K", "M", "G", "T", "P", "E")
+        coeff_and_unit = re.search('(\d+)(\D+)', size_str)
+        assert coeff_and_unit is not None
+        coeff = int(coeff_and_unit[1])
+        unit = coeff_and_unit[2]
+        try: 
+            factor = units.index(unit) % 7
+        except ValueError:
+            log.error("PV size format invalid")
+            raise
+        size = coeff * (2 ** (10 * factor))
+        return size
+
+    def devices(self) -> Dict[str, List[Device]]:
+        nodename_to_devices: Dict[str, List[Device]] = {}
+        for i in self.pvs_in_sc:
+            node, device = self.device(i)
+            if node not in nodename_to_devices:
+                nodename_to_devices[node] = []
+            nodename_to_devices[node].append(device)
+        return nodename_to_devices
+
+    def device(self, i: 'client.V1PersistentVolume') -> Tuple[str, Device]:
+        node = 'N/A'
+        if i.spec.node_affinity:
+            terms = i.spec.node_affinity.required.node_selector_terms
+            if len(terms) == 1 and len(terms[0].match_expressions) == 1 and terms[0].match_expressions[0].key == 'kubernetes.io/hostname' and len(terms[0].match_expressions[0].values) == 1:
+                node = terms[0].match_expressions[0].values[0]
+        size = self.convert_size(i.spec.capacity['storage'])
+        path = i.spec.host_path.path if i.spec.host_path else i.spec.local.path if i.spec.local else ('/dev/' + i.metadata.annotations['storage.openshift.com/device-name']) if i.metadata.annotations and 'storage.openshift.com/device-name' in i.metadata.annotations else ''
+        state = i.spec.volume_mode == 'Block' and i.status.phase == 'Available'
+        pv_name = i.metadata.name
+        device = Device(
+                path = path,
+                sys_api = dict(
+                    size = size,
+                    node = node,
+                    pv_name = pv_name
+                ),
+                available = state,
+        )
+        return (node, device)
+        
+
+class LSOFetcher(DefaultFetcher):
+    def __init__(self, storage_class: 'str', coreV1_api: 'client.CoreV1Api', customObjects_api: 'client.CustomObjectsApi', nodenames: 'Optional[List[str]]' = None):
+        super().__init__(storage_class, coreV1_api)
+        self.customObjects_api = customObjects_api
+        self.nodenames = nodenames
+
+    def fetch(self) -> None:
+        super().fetch()
+        self.discovery: KubernetesCustomResource = KubernetesCustomResource(self.customObjects_api.list_cluster_custom_object,
+                                                 group="local.storage.openshift.io",
+                                                 version="v1alpha1",
+                                                 plural="localvolumediscoveryresults")
+
+    def predicate(self, item: 'client.V1ConfigMapList') -> bool:
+            if self.nodenames is not None:
+                return item['spec']['nodeName'] in self.nodenames
+            else:
+                return True
+
+    def devices(self) -> Dict[str, List[Device]]:
+        try:
+            lso_discovery_results = [i for i in self.discovery.items if self.predicate(i)]
+        except ApiException as dummy_e:
+            log.error("Failed to fetch device metadata")
+            raise
+        self.lso_devices = {}
+        for i in lso_discovery_results:
+            drives = i['status']['discoveredDevices']
+            for drive in drives:
+                self.lso_devices[drive['deviceID'].split('/')[-1]] = drive
+        nodename_to_devices: Dict[str, List[Device]] = {}
+        for i in self.pvs_in_sc:
+            node, device = (None, None)
+            if (not i.metadata.annotations) or ('storage.openshift.com/device-id' not in i.metadata.annotations) or (i.metadata.annotations['storage.openshift.com/device-id'] not in self.lso_devices):
+                node, device = super().device(i)
+            else:
+                node, device = self.device(i)
+            if node not in nodename_to_devices:
+                nodename_to_devices[node] = []
+            nodename_to_devices[node].append(device)
+        return nodename_to_devices
+            
+    def device(self, i: Any) -> Tuple[str, Device]:
+        node = i.metadata.labels['kubernetes.io/hostname']
+        device_discovery = self.lso_devices[i.metadata.annotations['storage.openshift.com/device-id']]
+        pv_name = i.metadata.name
+        vendor: str = device_discovery['model'].split()[0] if len(device_discovery['model'].split()) >= 1 else ''
+        model: str = ' '.join(device_discovery['model'].split()[1:]) if len(device_discovery['model'].split()) > 1 else ''
+        device = Device(
+            path = device_discovery['path'],
+            sys_api = dict(
+                    size = device_discovery['size'],
+                    rotational = '1' if device_discovery['property']=='Rotational' else '0',
+                    node = node,
+                    pv_name = pv_name,
+                    model = model,
+                    vendor = vendor
+                ),
+            available = device_discovery['status']['state']=='Available',
+            device_id = device_discovery['deviceID'].split('/')[-1],
+            lsm_data = dict(
+                serialNum = device_discovery['serial']
+            )
+        )
+        return (node, device)
+
+
+class PDFetcher(DefaultFetcher):
+    """ Physical Devices Fetcher"""
+    def __init__(self, coreV1_api: 'client.CoreV1Api'):
+        self.coreV1_api = coreV1_api
+
+    def fetch(self) -> None:
+        """ Collect the devices information from k8s configmaps"""
+        self.dev_cms: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
+                                                              namespace='rook-ceph',
+                                                              label_selector='app=rook-discover')
+
+    def devices(self) -> Dict[str, List[Device]]:
+        """ Return the list of devices found"""
+        node_devices: Dict[str, List[Device]] = {}
+        for i in self.dev_cms.items:
+            devices_list: List[Device] = []
+            for d in json.loads(i.data['devices']):
+                devices_list.append(self.device(d)[1])
+            node_devices[i.metadata.labels['rook.io/node']] = devices_list
+
+        return node_devices
+
+    def device(self, devData: Dict[str,str]) -> Tuple[str, Device]:
+        """ Build an orchestrator device """
+        if 'cephVolumeData' in devData and devData['cephVolumeData']:
+            return "", Device.from_json(json.loads(devData['cephVolumeData']))
+        else:
+            return "", Device(
+                path='/dev/' + devData['name'],
+                sys_api=dict(
+                    rotational='1' if devData['rotational'] else '0',
+                    size=devData['size']
+                ),
+                available=False,
+                rejected_reasons=['device data coming from ceph-volume not provided'],
+            )
+
+
+class KubernetesResource(Generic[T]):
+    def __init__(self, api_func: Callable, **kwargs: Any) -> None:
+        """
+        Generic kubernetes Resource parent class
+
+        The api fetch and watch methods should be common across resource types,
+
+        Exceptions in the runner thread are propagated to the caller.
+
+        :param api_func: kubernetes client api function that is passed to the watcher
+        :param filter_func: signature: ``(Item) -> bool``.
+        """
+        self.kwargs = kwargs
+        self.api_func = api_func
+
+        # ``_items`` is accessed by different threads. I assume assignment is atomic.
+        self._items: Dict[str, T] = dict()
+        self.thread = None  # type: Optional[threading.Thread]
+        self.exception: Optional[Exception] = None
+        if not _urllib3_supports_read_chunked:
+            logging.info('urllib3 is too old. Fallback to full fetches')
+
+    def _fetch(self) -> str:
+        """ Execute the requested api method as a one-off fetch"""
+        response = self.api_func(**self.kwargs)
+        metadata = response.metadata
+        self._items = {item.metadata.name: item for item in response.items}
+        log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
+        return metadata.resource_version
+
+    @property
+    def items(self) -> Iterable[T]:
+        """
+        Returns the items of the request.
+        Creates the watcher as a side effect.
+        :return:
+        """
+        if self.exception:
+            e = self.exception
+            self.exception = None
+            raise e  # Propagate the exception to the user.
+        if not self.thread or not self.thread.is_alive():
+            resource_version = self._fetch()
+            if _urllib3_supports_read_chunked:
+                # Start a thread which will use the kubernetes watch client against a resource
+                log.debug("Attaching resource watcher for k8s {}".format(self.api_func))
+                self.thread = self._watch(resource_version)
+
+        return self._items.values()
+
+    def get_item_name(self, item: Any) -> Any:
+        try:
+            return item.metadata.name
+        except AttributeError:
+                    raise AttributeError(
+                        "{} doesn't contain a metadata.name. Unable to track changes".format(
+                            self.api_func)) 
+    @threaded
+    def _watch(self, res_ver: Optional[str]) -> None:
+        """ worker thread that runs the kubernetes watch """
+
+        self.exception = None
+
+        w = watch.Watch()
+
+        try:
+            # execute generator to continually watch resource for changes
+            for event in w.stream(self.api_func, resource_version=res_ver, watch=True,
+                                  **self.kwargs):
+                self.health = ''
+                item = event['object']
+                name = self.get_item_name(item)
+
+                log.info('{} event: {}'.format(event['type'], name))
+
+                if event['type'] in ('ADDED', 'MODIFIED'):
+                    self._items = merge_dicts(self._items, {name: item})
+                elif event['type'] == 'DELETED':
+                    self._items = {k:v for k,v in self._items.items() if k != name}
+                elif event['type'] == 'BOOKMARK':
+                    pass
+                elif event['type'] == 'ERROR':
+                    raise ApiException(str(event))
+                else:
+                    raise KeyError('Unknown watch event {}'.format(event['type']))
+        except ProtocolError as e:
+            if 'Connection broken' in str(e):
+                log.info('Connection reset.')
+                return
+            raise
+        except ApiException as e:
+            log.exception('K8s API failed. {}'.format(self.api_func))
+            self.exception = e
+            raise
+        except Exception as e:
+            log.exception("Watcher failed. ({})".format(self.api_func))
+            self.exception = e
+            raise
+
+class KubernetesCustomResource(KubernetesResource):
+    def _fetch(self) -> str:
+        response = self.api_func(**self.kwargs)
+        metadata = response['metadata']
+        self._items = {item['metadata']['name']: item for item in response['items']}
+        log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
+        return metadata['resourceVersion']
+
+    def get_item_name(self, item: Any) -> Any:
+        try:
+            return item['metadata']['name']
+        except AttributeError:
+                    raise AttributeError(
+                        "{} doesn't contain a metadata.name. Unable to track changes".format(
+                            self.api_func))
+
+class DefaultCreator():
+    def __init__(self, inventory: 'Dict[str, List[Device]]', coreV1_api: 'client.CoreV1Api', storage_class: 'str'):
+        self.coreV1_api = coreV1_api
+        self.storage_class = storage_class
+        self.inventory = inventory
+
+    def device_to_device_set(self, drive_group: DriveGroupSpec, d: Device) -> ccl.StorageClassDeviceSetsItem:
+        device_set = ccl.StorageClassDeviceSetsItem(
+                    name=d.sys_api['pv_name'],
+                    volumeClaimTemplates= ccl.VolumeClaimTemplatesList(),
+                    count=1,
+                    encrypted=drive_group.encrypted,
+                    portable=False
+                )
+        device_set.volumeClaimTemplates.append(
+            ccl.VolumeClaimTemplatesItem(
+                metadata=ccl.Metadata(
+                    name="data"
+                ),
+                spec=ccl.Spec(
+                    storageClassName=self.storage_class,
+                    volumeMode="Block",
+                    accessModes=ccl.CrdObjectList(["ReadWriteOnce"]),
+                    resources={
+                        "requests":{
+                                "storage": 1
+                        }
+                    },
+                    volumeName=d.sys_api['pv_name']
+                )
+            )
+        )
+        return device_set
+
+    def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
+        device_list = []
+        assert drive_group.data_devices is not None
+        sizematcher: Optional[SizeMatcher] = None
+        if drive_group.data_devices.size:
+            sizematcher = SizeMatcher('size', drive_group.data_devices.size)
+        limit = getattr(drive_group.data_devices, 'limit', None)
+        count = 0
+        all = getattr(drive_group.data_devices, 'all', None)
+        paths = [device.path for device in drive_group.data_devices.paths]
+        osd_list = []
+        for pod in rook_pods.items:
+            if (
+                hasattr(pod, 'metadata') 
+                and hasattr(pod.metadata, 'labels') 
+                and 'osd' in pod.metadata.labels 
+                and 'ceph.rook.io/DeviceSet' in pod.metadata.labels
+            ):
+                osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
+        for _, node in self.inventory.items():
+            for device in node:
+                if device.sys_api['pv_name'] in osd_list:
+                    count += 1
+        for _, node in self.inventory.items():
+            for device in node:
+                if not limit or (count < limit):
+                    if device.available:
+                        if (
+                            all 
+                            or (
+                                device.sys_api['node'] in matching_hosts
+                                and ((sizematcher != None) or sizematcher.compare(device))
+                                and (
+                                    not drive_group.data_devices.paths
+                                    or (device.path in paths)
+                                )
+                            )
+                        ):
+                            device_list.append(device)
+                            count += 1
+        
+        return device_list
+
+    def add_osds(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> Any:
+        to_create = self.filter_devices(rook_pods, drive_group,matching_hosts)
+        assert drive_group.data_devices is not None
+        def _add_osds(current_cluster, new_cluster):
+            # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
+            if not hasattr(new_cluster.spec, 'storage') or not new_cluster.spec.storage:
+                new_cluster.spec.storage = ccl.Storage()
+
+            if not hasattr(new_cluster.spec.storage, 'storageClassDeviceSets') or not new_cluster.spec.storage.storageClassDeviceSets:
+                new_cluster.spec.storage.storageClassDeviceSets = ccl.StorageClassDeviceSetsList()
+
+            existing_scds = [
+                scds.name for scds in new_cluster.spec.storage.storageClassDeviceSets
+            ]
+            for device in to_create:
+                new_scds = self.device_to_device_set(drive_group, device)
+                if new_scds.name not in existing_scds:
+                    new_cluster.spec.storage.storageClassDeviceSets.append(new_scds)
+            return new_cluster
+        return _add_osds
+
+class LSOCreator(DefaultCreator):
+    def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
+        device_list = []
+        assert drive_group.data_devices is not None
+        sizematcher = None
+        if drive_group.data_devices.size:
+            sizematcher = SizeMatcher('size', drive_group.data_devices.size)
+        limit = getattr(drive_group.data_devices, 'limit', None)
+        all = getattr(drive_group.data_devices, 'all', None)
+        paths = [device.path for device in drive_group.data_devices.paths]
+        vendor = getattr(drive_group.data_devices, 'vendor', None)
+        model = getattr(drive_group.data_devices, 'model', None)
+        count = 0
+        osd_list = []
+        for pod in rook_pods.items:
+            if (
+                hasattr(pod, 'metadata') 
+                and hasattr(pod.metadata, 'labels') 
+                and 'osd' in pod.metadata.labels 
+                and 'ceph.rook.io/DeviceSet' in pod.metadata.labels
+            ):
+                osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
+        for _, node in self.inventory.items():
+            for device in node:
+                if device.sys_api['pv_name'] in osd_list:
+                    count += 1
+        for _, node in self.inventory.items():
+            for device in node:
+                if not limit or (count < limit):
+                    if device.available:
+                        if (
+                            all 
+                            or (
+                                device.sys_api['node'] in matching_hosts
+                                and ((sizematcher != None) or sizematcher.compare(device))
+                                and (
+                                    not drive_group.data_devices.paths
+                                    or device.path in paths
+                                ) 
+                                and (
+                                    not vendor 
+                                    or device.sys_api['vendor'] == vendor
+                                )
+                                and (
+                                    not model 
+                                    or device.sys_api['model'].startsWith(model)
+                                )
+                            )
+                        ):
+                            device_list.append(device)
+                            count += 1
+        return device_list
+
+class DefaultRemover():
+    def __init__(
+        self,
+        coreV1_api: 'client.CoreV1Api', 
+        batchV1_api: 'client.BatchV1Api', 
+        appsV1_api: 'client.AppsV1Api', 
+        osd_ids: List[str], 
+        replace_flag: bool, 
+        force_flag: bool, 
+        mon_command: Callable, 
+        patch: Callable, 
+        rook_env: 'RookEnv',
+        inventory: Dict[str, List[Device]]
+    ):
+        self.batchV1_api = batchV1_api
+        self.appsV1_api = appsV1_api
+        self.coreV1_api = coreV1_api
+
+        self.osd_ids = osd_ids
+        self.replace_flag = replace_flag
+        self.force_flag = force_flag
+
+        self.mon_command = mon_command
+
+        self.patch = patch
+        self.rook_env = rook_env
+
+        self.inventory = inventory
+        self.osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace='rook-ceph', label_selector='app=rook-ceph-osd')
+        self.jobs: KubernetesResource = KubernetesResource(self.batchV1_api.list_namespaced_job, namespace='rook-ceph', label_selector='app=rook-ceph-osd-prepare')
+        self.pvcs: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_persistent_volume_claim, namespace='rook-ceph')
+
+
+    def remove_device_sets(self) -> str:
+        self.to_remove: Dict[str, int] = {}
+        self.pvc_to_remove: List[str] = []
+        for pod in self.osd_pods.items:
+            if (
+                hasattr(pod, 'metadata') 
+                and hasattr(pod.metadata, 'labels') 
+                and 'osd' in pod.metadata.labels 
+                and pod.metadata.labels['osd'] in self.osd_ids
+            ):
+                if pod.metadata.labels['ceph.rook.io/DeviceSet'] in self.to_remove:
+                    self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] + 1
+                else:
+                    self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = 1
+                self.pvc_to_remove.append(pod.metadata.labels['ceph.rook.io/pvc'])
+        def _remove_osds(current_cluster, new_cluster):
+            # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
+            assert new_cluster.spec.storage is not None and new_cluster.spec.storage.storageClassDeviceSets is not None
+            for _set in new_cluster.spec.storage.storageClassDeviceSets:
+                    if _set.name in self.to_remove:
+                        if _set.count == self.to_remove[_set.name]:
+                            new_cluster.spec.storage.storageClassDeviceSets.remove(_set)
+                        else:
+                            _set.count = _set.count - self.to_remove[_set.name]
+            return new_cluster
+        return self.patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _remove_osds)
+
+    def check_force(self) -> None:
+        if not self.force_flag:
+            safe_args = {'prefix': 'osd safe-to-destroy',
+                        'ids': [str(x) for x in self.osd_ids]}
+            ret, out, err = self.mon_command(safe_args)
+            if ret != 0:
+                raise RuntimeError(err)
+
+    def set_osds_down(self) -> None:
+        down_flag_args = {
+            'prefix': 'osd down',
+            'ids': [str(x) for x in self.osd_ids]
+        }
+        ret, out, err = self.mon_command(down_flag_args)
+        if ret != 0:
+            raise RuntimeError(err)
+
+    def scale_deployments(self) -> None:
+        for osd_id in self.osd_ids:
+            self.appsV1_api.patch_namespaced_deployment_scale(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), body=client.V1Scale(
+                spec=client.V1ScaleSpec(
+                    replicas=0
+                )
+            ))
+
+    def set_osds_out(self) -> None:
+        out_flag_args = {
+            'prefix': 'osd out',
+            'ids': [str(x) for x in self.osd_ids]
+        }
+        ret, out, err = self.mon_command(out_flag_args)
+        if ret != 0:
+            raise RuntimeError(err)
+            
+    def delete_deployments(self) -> None:
+        for osd_id in self.osd_ids:
+            self.appsV1_api.delete_namespaced_deployment(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), propagation_policy='Foreground')
+
+    def clean_up_prepare_jobs_and_pvc(self) -> None:
+        for job in self.jobs.items:
+            if job.metadata.labels['ceph.rook.io/pvc'] in self.pvc_to_remove:
+                self.batchV1_api.delete_namespaced_job(name=job.metadata.name, namespace='rook-ceph', propagation_policy='Foreground')
+                self.coreV1_api.delete_namespaced_persistent_volume_claim(name=job.metadata.labels['ceph.rook.io/pvc'], namespace='rook-ceph', propagation_policy='Foreground')
+
+    def purge_osds(self) -> None:
+        for id in self.osd_ids:
+            purge_args = {
+                'prefix': 'osd purge-actual',
+                'id': int(id),
+                'yes_i_really_mean_it': True
+            }
+            ret, out, err = self.mon_command(purge_args)
+            if ret != 0:
+                raise RuntimeError(err)
+
+    def destroy_osds(self) -> None:
+        for id in self.osd_ids:
+            destroy_args = {
+                'prefix': 'osd destroy-actual',
+                'id': int(id),
+                'yes_i_really_mean_it': True
+            }
+            ret, out, err = self.mon_command(destroy_args)
+            if ret != 0:
+                raise RuntimeError(err)
+
+    def remove(self) -> str:
+        try:
+            self.check_force()
+        except Exception as e:
+            log.exception("Error checking if OSDs are safe to destroy")
+            return f"OSDs not safe to destroy or unable to check if they are safe to destroy: {e}"
+        try:
+            remove_result = self.remove_device_sets()
+        except Exception as e:
+            log.exception("Error patching ceph cluster CRD")
+            return f"Not possible to modify Ceph cluster CRD: {e}"
+        try:
+            self.scale_deployments()
+            self.delete_deployments()
+            self.clean_up_prepare_jobs_and_pvc()
+        except Exception as e:
+            log.exception("Ceph cluster CRD patched, but error cleaning environment")
+            return f"Error cleaning environment after removing OSDs from Ceph cluster CRD: {e}"
+        try:
+            self.set_osds_down()
+            self.set_osds_out()
+            if self.replace_flag:
+                self.destroy_osds()
+            else:
+                self.purge_osds()
+        except Exception as e:
+            log.exception("OSDs removed from environment, but not able to remove OSDs from Ceph cluster")
+            return f"Error removing OSDs from Ceph cluster: {e}"
+
+        return remove_result
+
+
+
 class RookCluster(object):
 class RookCluster(object):
-    def __init__(self, k8s, rook_env):
+    # import of client.CoreV1Api must be optional at import time.
+    # Instead allow mgr/rook to be imported anyway.
+    def __init__(
+        self,
+        coreV1_api: 'client.CoreV1Api',
+        batchV1_api: 'client.BatchV1Api',
+        customObjects_api: 'client.CustomObjectsApi',
+        storageV1_api: 'client.StorageV1Api',
+        appsV1_api: 'client.AppsV1Api',
+        rook_env: 'RookEnv',
+        storage_class: 'str'
+    ):
         self.rook_env = rook_env  # type: RookEnv
         self.rook_env = rook_env  # type: RookEnv
-        self.k8s = k8s
-
-    def rook_url(self, path):
+        self.coreV1_api = coreV1_api  # client.CoreV1Api
+        self.batchV1_api = batchV1_api
+        self.customObjects_api = customObjects_api
+        self.storageV1_api = storageV1_api  # client.StorageV1Api
+        self.appsV1_api = appsV1_api  # client.AppsV1Api
+        self.storage_class = storage_class # type: str
+
+        #  TODO: replace direct k8s calls with Rook API calls
+        self.storage_classes : KubernetesResource = KubernetesResource(self.storageV1_api.list_storage_class)
+
+        self.rook_pods: KubernetesResource[client.V1Pod] = KubernetesResource(self.coreV1_api.list_namespaced_pod,
+                                            namespace=self.rook_env.namespace,
+                                            label_selector="rook_cluster={0}".format(
+                                                self.rook_env.namespace))
+        self.nodes: KubernetesResource[client.V1Node] = KubernetesResource(self.coreV1_api.list_node)
+
+    def rook_url(self, path: str) -> str:
         prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
             self.rook_env.crd_version, self.rook_env.namespace)
         return urljoin(prefix, path)
 
         prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
             self.rook_env.crd_version, self.rook_env.namespace)
         return urljoin(prefix, path)
 
-    def rook_api_call(self, verb, path, **kwargs):
+    def rook_api_call(self, verb: str, path: str, **kwargs: Any) -> Any:
         full_path = self.rook_url(path)
         log.debug("[%s] %s" % (verb, full_path))
 
         full_path = self.rook_url(path)
         log.debug("[%s] %s" % (verb, full_path))
 
-        return self.k8s.api_client.call_api(
+        return self.coreV1_api.api_client.call_api(
             full_path,
             verb,
             auth_settings=['BearerToken'],
             full_path,
             verb,
             auth_settings=['BearerToken'],
@@ -61,46 +728,47 @@ class RookCluster(object):
             _preload_content=True,
             **kwargs)
 
             _preload_content=True,
             **kwargs)
 
-    def rook_api_get(self, path, **kwargs):
+    def rook_api_get(self, path: str, **kwargs: Any) -> Any:
         return self.rook_api_call("GET", path, **kwargs)
 
         return self.rook_api_call("GET", path, **kwargs)
 
-    def rook_api_delete(self, path):
+    def rook_api_delete(self, path: str) -> Any:
         return self.rook_api_call("DELETE", path)
 
         return self.rook_api_call("DELETE", path)
 
-    def rook_api_patch(self, path, **kwargs):
+    def rook_api_patch(self, path: str, **kwargs: Any) -> Any:
         return self.rook_api_call("PATCH", path,
                                   header_params={"Content-Type": "application/json-patch+json"},
                                   **kwargs)
 
         return self.rook_api_call("PATCH", path,
                                   header_params={"Content-Type": "application/json-patch+json"},
                                   **kwargs)
 
-    def rook_api_post(self, path, **kwargs):
+    def rook_api_post(self, path: str, **kwargs: Any) -> Any:
         return self.rook_api_call("POST", path, **kwargs)
 
         return self.rook_api_call("POST", path, **kwargs)
 
-    def get_discovered_devices(self, nodenames=None):
-        # TODO: replace direct k8s calls with Rook API calls
-        # when they're implemented
-        label_selector = "app=rook-discover"
-        if nodenames is not None:
-            # FIXME: is there a practical or official limit on the
-            # number of entries in a label selector
-            label_selector += ", rook.io/node in ({0})".format(
-                ", ".join(nodenames))
-
-        try:
-            result = self.k8s.list_namespaced_config_map(
-                self.rook_env.operator_namespace,
-                label_selector=label_selector)
-        except ApiException as e:
-            log.exception("Failed to fetch device metadata: {0}".format(e))
-            raise
-
-        nodename_to_devices = {}
-        for i in result.items:
-            drives = json.loads(i.data['devices'])
-            nodename_to_devices[i.metadata.labels['rook.io/node']] = drives
+    def get_storage_class(self) -> 'client.V1StorageClass':
+        matching_sc = [i for i in self.storage_classes.items if self.storage_class == i.metadata.name]
+        if len(matching_sc) == 0:
+            log.error(f"No storage class exists matching configured Rook orchestrator storage class which currently is <{self.storage_class}>. This storage class can be set in ceph config (mgr/rook/storage_class)")
+            raise Exception('No storage class exists matching name provided in ceph config at mgr/rook/storage_class')
+        return matching_sc[0]
+
+    def get_discovered_devices(self, nodenames: Optional[List[str]] = None) -> Dict[str, List[Device]]:
+        self.fetcher: Optional[DefaultFetcher] = None
+        op_settings = self.coreV1_api.read_namespaced_config_map(name="rook-ceph-operator-config", namespace='rook-ceph').data
+        if op_settings.get('ROOK_ENABLE_DISCOVERY_DAEMON', 'false').lower() == 'true':
+            self.fetcher = PDFetcher(self.coreV1_api)
+        else:
+            storage_class = self.get_storage_class()
+            if storage_class.metadata.labels and ('local.storage.openshift.io/owner-name' in storage_class.metadata.labels):
+                self.fetcher = LSOFetcher(self.storage_class, self.coreV1_api, self.customObjects_api, nodenames)
+            else:
+                self.fetcher = DefaultFetcher(self.storage_class, self.coreV1_api)
 
 
-        return nodename_to_devices
+        self.fetcher.fetch()
+        return self.fetcher.devices()
 
 
-    def get_nfs_conf_url(self, nfs_cluster, instance):
+    def get_osds(self) -> List:
+        osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace='rook-ceph', label_selector='app=rook-ceph-osd')
+        return list(osd_pods.items)
+        
+    def get_nfs_conf_url(self, nfs_cluster: str, instance: str) -> Optional[str]:
         #
         # Fetch cephnfs object for "nfs_cluster" and then return a rados://
         # URL for the instance within that cluster. If the fetch fails, just
         #
         # Fetch cephnfs object for "nfs_cluster" and then return a rados://
         # URL for the instance within that cluster. If the fetch fails, just
@@ -121,74 +789,118 @@ class RookCluster(object):
             url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance)
         return url
 
             url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance)
         return url
 
+    def describe_pods(self,
+                      service_type: Optional[str],
+                      service_id: Optional[str],
+                      nodename: Optional[str]) -> List[Dict[str, Any]]:
+        """
+        Go query the k8s API about deployment, containers related to this
+        filesystem
 
 
-    def describe_pods(self, service_type, service_id, nodename):
-        # Go query the k8s API about deployment, containers related to this
-        # filesystem
-
-        # Inspect the Rook YAML, to decide whether this filesystem
-        # is Ceph-managed or Rook-managed
-        # TODO: extend Orchestrator interface to describe whether FS
-        # is manageable by us or not
-
-        # Example Rook Pod labels for a mgr daemon:
-        # Labels:         app=rook-ceph-mgr
-        #                 pod-template-hash=2171958073
-        #                 rook_cluster=rook
-        # And MDS containers additionally have `rook_filesystem` label
-
-        # Label filter is rook_cluster=<cluster name>
-        #                 rook_file_system=<self.fs_name>
-
-        label_filter = "rook_cluster={0}".format(self.rook_env.cluster_name)
-        if service_type != None:
-            label_filter += ",app=rook-ceph-{0}".format(service_type)
-            if service_id != None:
-                if service_type == "mds":
-                    label_filter += ",rook_file_system={0}".format(service_id)
-                elif service_type == "osd":
-                    # Label added in https://github.com/rook/rook/pull/1698
-                    label_filter += ",ceph-osd-id={0}".format(service_id)
-                elif service_type == "mon":
-                    # label like mon=rook-ceph-mon0
-                    label_filter += ",mon={0}".format(service_id)
-                elif service_type == "mgr":
-                    label_filter += ",mgr={0}".format(service_id)
-                elif service_type == "nfs":
-                    label_filter += ",ceph_nfs={0}".format(service_id)
-                elif service_type == "rgw":
-                    # TODO: rgw
-                    pass
-
-        field_filter = ""
-        if nodename != None:
-            field_filter = "spec.nodeName={0}".format(nodename)
+        Example Rook Pod labels for a mgr daemon:
+        Labels:         app=rook-ceph-mgr
+                        pod-template-hash=2171958073
+                        rook_cluster=rook
+        And MDS containers additionally have `rook_filesystem` label
 
 
-        pods = self.k8s.list_namespaced_pod(
-            self.rook_env.namespace,
-            label_selector=label_filter,
-            field_selector=field_filter)
+        Label filter is rook_cluster=<cluster namespace>
+                        rook_file_system=<self.fs_name>
+        """
+        def predicate(item):
+            # type: (client.V1Pod) -> bool
+            metadata = item.metadata
+            if service_type is not None:
+                if metadata.labels['app'] != "rook-ceph-{0}".format(service_type):
+                    return False
+
+                if service_id is not None:
+                    try:
+                        k, v = {
+                            "mds": ("rook_file_system", service_id),
+                            "osd": ("ceph-osd-id", service_id),
+                            "mon": ("mon", service_id),
+                            "mgr": ("mgr", service_id),
+                            "nfs": ("nfs", service_id),
+                            "rgw": ("ceph_rgw", service_id),
+                        }[service_type]
+                    except KeyError:
+                        raise orchestrator.OrchestratorValidationError(
+                            '{} not supported'.format(service_type))
+                    if metadata.labels[k] != v:
+                        return False
+
+            if nodename is not None:
+                if item.spec.node_name != nodename:
+                    return False
+            return True
 
 
-        # import json
-        # print json.dumps(pods.items[0])
+        refreshed = datetime_now()
+        pods = [i for i in self.rook_pods.items if predicate(i)]
 
         pods_summary = []
 
         pods_summary = []
+        prefix = 'sha256:'
 
 
-        for p in pods.items:
+        for p in pods:
             d = p.to_dict()
             d = p.to_dict()
-            # p['metadata']['creationTimestamp']
-            # p['metadata']['nodeName']
-            pods_summary.append({
+
+            image_name = None
+            for c in d['spec']['containers']:
+                # look at the first listed container in the pod...
+                image_name = c['image']
+                break
+
+            ls = d['status'].get('container_statuses')
+            if not ls:
+                # ignore pods with no containers
+                continue
+            image_id = ls[0]['image_id']
+            image_id = image_id.split(prefix)[1] if prefix in image_id else image_id
+
+            s = {
                 "name": d['metadata']['name'],
                 "name": d['metadata']['name'],
-                "nodename": d['spec']['node_name'],
-                "labels": d['metadata']['labels']
-            })
-            pass
+                "hostname": d['spec']['node_name'],
+                "labels": d['metadata']['labels'],
+                'phase': d['status']['phase'],
+                'container_image_name': image_name,
+                'container_image_id': image_id,
+                'refreshed': refreshed,
+                # these may get set below...
+                'started': None,
+                'created': None,
+            }
+
+            # note: we want UTC
+            if d['metadata'].get('creation_timestamp', None):
+                s['created'] = d['metadata']['creation_timestamp'].astimezone(
+                    tz=datetime.timezone.utc)
+            if d['status'].get('start_time', None):
+                s['started'] = d['status']['start_time'].astimezone(
+                    tz=datetime.timezone.utc)
+
+            pods_summary.append(s)
 
         return pods_summary
 
 
         return pods_summary
 
+    def remove_pods(self, names: List[str]) -> List[str]:
+        pods = [i for i in self.rook_pods.items]
+        for p in pods:
+            d = p.to_dict()
+            daemon_type = d['metadata']['labels']['app'].replace('rook-ceph-','')
+            daemon_id = d['metadata']['labels']['ceph_daemon_id']
+            name = daemon_type + '.' + daemon_id
+            if name in names:
+                self.coreV1_api.delete_namespaced_pod(
+                    d['metadata']['name'],
+                    self.rook_env.namespace,
+                    body=client.V1DeleteOptions()
+                )
+        return [f'Removed Pod {n}' for n in names]
+
+    def get_node_names(self) -> List[str]:
+        return [i.metadata.name for i in self.nodes.items]
+
     @contextmanager
     @contextmanager
-    def ignore_409(self, what):
+    def ignore_409(self, what: str) -> Iterator[None]:
         try:
             yield
         except ApiException as e:
         try:
             yield
         except ApiException as e:
@@ -198,114 +910,220 @@ class RookCluster(object):
             else:
                 raise
 
             else:
                 raise
 
-    def add_filesystem(self, spec):
+    def apply_filesystem(self, spec: ServiceSpec, num_replicas: int,
+                         leaf_type: str) -> str:
         # TODO use spec.placement
         # TODO warn if spec.extended has entries we don't kow how
         #      to action.
         # TODO use spec.placement
         # TODO warn if spec.extended has entries we don't kow how
         #      to action.
-
-        rook_fs = {
-            "apiVersion": self.rook_env.api_name,
-            "kind": "CephFilesystem",
-            "metadata": {
-                "name": spec.name,
-                "namespace": self.rook_env.namespace
-            },
-            "spec": {
-                "onlyManageDaemons": True,
-                "metadataServer": {
-                    "activeCount": spec.count,
-                    "activeStandby": True
-
-                }
-            }
-        }
-
-        with self.ignore_409("CephFilesystem '{0}' already exists".format(spec.name)):
-            self.rook_api_post("cephfilesystems/", body=rook_fs)
-
-    def add_nfsgw(self, spec):
+        all_hosts = self.get_hosts()
+        def _update_fs(new: cfs.CephFilesystem) -> cfs.CephFilesystem:
+            new.spec.metadataServer.activeCount = spec.placement.count or 1
+            new.spec.metadataServer.placement = cfs.Placement(
+                nodeAffinity=cfs.NodeAffinity(
+                    requiredDuringSchedulingIgnoredDuringExecution=cfs.RequiredDuringSchedulingIgnoredDuringExecution(
+                        nodeSelectorTerms=cfs.NodeSelectorTermsList(
+                            [placement_spec_to_node_selector(spec.placement, all_hosts)]
+                        )
+                    )
+                )
+            )
+            return new
+        def _create_fs() -> cfs.CephFilesystem:
+            fs = cfs.CephFilesystem(
+                apiVersion=self.rook_env.api_name,
+                metadata=dict(
+                    name=spec.service_id,
+                    namespace=self.rook_env.namespace,
+                ),
+                spec=cfs.Spec(
+                    dataPools=cfs.DataPoolsList(
+                        {
+                            cfs.DataPoolsItem(
+                                failureDomain=leaf_type,
+                                replicated=cfs.Replicated(
+                                    size=num_replicas
+                                )
+                            )
+                        }
+                    ),
+                    metadataPool=cfs.MetadataPool(
+                        failureDomain=leaf_type,
+                        replicated=cfs.Replicated(
+                            size=num_replicas
+                        )
+                    ),
+                    metadataServer=cfs.MetadataServer(
+                        activeCount=spec.placement.count or 1,
+                        activeStandby=True,
+                        placement=
+                        cfs.Placement(
+                            nodeAffinity=cfs.NodeAffinity(
+                                requiredDuringSchedulingIgnoredDuringExecution=cfs.RequiredDuringSchedulingIgnoredDuringExecution(
+                                    nodeSelectorTerms=cfs.NodeSelectorTermsList(
+                                        [placement_spec_to_node_selector(spec.placement, all_hosts)]
+                                    )
+                                )
+                            )
+                        )
+                    )
+                )
+            )
+            return fs
+        assert spec.service_id is not None
+        return self._create_or_patch(
+            cfs.CephFilesystem, 'cephfilesystems', spec.service_id,
+            _update_fs, _create_fs)
+
+    def get_matching_node(self, host: str) -> Any:
+        matching_node = None
+        for node in self.nodes.items:
+            if node.metadata.labels['kubernetes.io/hostname'] == host:
+                matching_node = node
+        return matching_node
+
+    def add_host_label(self, host: str, label: str) -> OrchResult[str]:
+        matching_node = self.get_matching_node(host)
+        if matching_node == None:
+            return OrchResult(None, RuntimeError(f"Cannot add {label} label to {host}: host not found in cluster")) 
+        matching_node.metadata.labels['ceph-label/'+ label] = ""
+        self.coreV1_api.patch_node(host, matching_node)
+        return OrchResult(f'Added {label} label to {host}')
+
+    def remove_host_label(self, host: str, label: str) -> OrchResult[str]:
+        matching_node = self.get_matching_node(host)
+        if matching_node == None:
+            return OrchResult(None, RuntimeError(f"Cannot remove {label} label from {host}: host not found in cluster")) 
+        matching_node.metadata.labels.pop('ceph-label/' + label, None)
+        self.coreV1_api.patch_node(host, matching_node)
+        return OrchResult(f'Removed {label} label from {host}')
+
+    def apply_objectstore(self, spec: RGWSpec, num_replicas: int, leaf_type: str) -> str:
+        assert spec.service_id is not None
+
+        name = spec.service_id
+
+        if '.' in spec.service_id:
+            # rook does not like . in the name.  this is could
+            # there because it is a legacy rgw spec that was named
+            # like $realm.$zone, except that I doubt there were any
+            # users of this code.  Instead, focus on future users and
+            # translate . to - (fingers crossed!) instead.
+            name = spec.service_id.replace('.', '-')
+
+        all_hosts = self.get_hosts()
+        def _create_zone() -> cos.CephObjectStore:
+            port = None
+            secure_port = None
+            if spec.ssl:
+                secure_port = spec.get_port()
+            else:
+                port = spec.get_port()
+            object_store = cos.CephObjectStore(
+                    apiVersion=self.rook_env.api_name,
+                    metadata=dict(
+                        name=name,
+                        namespace=self.rook_env.namespace
+                    ),
+                    spec=cos.Spec(
+                        gateway=cos.Gateway(
+                            port=port,
+                            securePort=secure_port,
+                            instances=spec.placement.count or 1,
+                            placement=cos.Placement(
+                                cos.NodeAffinity(
+                                    requiredDuringSchedulingIgnoredDuringExecution=cos.RequiredDuringSchedulingIgnoredDuringExecution(
+                                        nodeSelectorTerms=cos.NodeSelectorTermsList(
+                                            [
+                                                placement_spec_to_node_selector(spec.placement, all_hosts)
+                                            ]
+                                        )
+                                    )
+                                )
+                            )
+                        ),
+                        dataPool=cos.DataPool(
+                            failureDomain=leaf_type,
+                            replicated=cos.Replicated(
+                                size=num_replicas
+                            )
+                        ),
+                        metadataPool=cos.MetadataPool(
+                            failureDomain=leaf_type,
+                            replicated=cos.Replicated(
+                                size=num_replicas
+                            )
+                        )
+                    )
+                )
+            if spec.rgw_zone:
+                object_store.spec.zone=cos.Zone(
+                            name=spec.rgw_zone
+                        )
+            return object_store
+                
+
+        def _update_zone(new: cos.CephObjectStore) -> cos.CephObjectStore:
+            if new.spec.gateway:
+                new.spec.gateway.instances = spec.placement.count or 1
+            else: 
+                new.spec.gateway=cos.Gateway(
+                    instances=spec.placement.count or 1
+                )
+            return new
+        return self._create_or_patch(
+            cos.CephObjectStore, 'cephobjectstores', name,
+            _update_zone, _create_zone)
+
+    def apply_nfsgw(self, spec: NFSServiceSpec, mgr: 'RookOrchestrator') -> str:
         # TODO use spec.placement
         # TODO warn if spec.extended has entries we don't kow how
         #      to action.
         # TODO use spec.placement
         # TODO warn if spec.extended has entries we don't kow how
         #      to action.
-
-        rook_nfsgw = {
-            "apiVersion": self.rook_env.api_name,
-            "kind": "CephNFS",
-            "metadata": {
-                "name": spec.name,
-                "namespace": self.rook_env.namespace
-            },
-            "spec": {
-                "rados": {
-                    "pool": spec.extended["pool"]
-                },
-                "server": {
-                    "active": spec.count,
-                }
-            }
-        }
-
-        if "namespace" in spec.extended:
-            rook_nfsgw["spec"]["rados"]["namespace"] = spec.extended["namespace"]
-
-        with self.ignore_409("NFS cluster '{0}' already exists".format(spec.name)):
-            self.rook_api_post("cephnfses/", body=rook_nfsgw)
-
-    def add_objectstore(self, spec):
-        rook_os = {
-            "apiVersion": self.rook_env.api_name,
-            "kind": "CephObjectStore",
-            "metadata": {
-                "name": spec.name,
-                "namespace": self.rook_env.namespace
-            },
-            "spec": {
-                "metadataPool": {
-                    "failureDomain": "host",
-                    "replicated": {
-                        "size": 1
-                    }
-                },
-                "dataPool": {
-                    "failureDomain": "osd",
-                    "replicated": {
-                        "size": 1
-                    }
-                },
-                "gateway": {
-                    "type": "s3",
-                    "port": 80,
-                    "instances": 1,
-                    "allNodes": False
-                }
-            }
-        }
-        
-        with self.ignore_409("CephObjectStore '{0}' already exists".format(spec.name)):
-            self.rook_api_post("cephobjectstores/", body=rook_os)
-
-    def rm_service(self, service_type, service_id):
-        assert service_type in ("mds", "rgw", "nfs")
-
-        if service_type == "mds":
-            rooktype = "cephfilesystems"
-        elif service_type == "rgw":
-            rooktype = "cephobjectstores"
-        elif service_type == "nfs":
-            rooktype = "cephnfses"
-
+        # TODO Number of pods should be based on the list of hosts in the
+        #      PlacementSpec.
+        assert spec.service_id, "service id in NFS service spec cannot be an empty string or None " # for mypy typing
+        service_id = spec.service_id
+        mgr_module = cast(Module, mgr)
+        count = spec.placement.count or 1
+        def _update_nfs(new: cnfs.CephNFS) -> cnfs.CephNFS:
+            new.spec.server.active = count
+            return new
+
+        def _create_nfs() -> cnfs.CephNFS:
+            rook_nfsgw = cnfs.CephNFS(
+                    apiVersion=self.rook_env.api_name,
+                    metadata=dict(
+                        name=spec.service_id,
+                        namespace=self.rook_env.namespace,
+                        ),
+                    spec=cnfs.Spec(
+                        rados=cnfs.Rados(
+                            namespace=service_id,
+                            pool=NFS_POOL_NAME,
+                            ),
+                        server=cnfs.Server(
+                            active=count
+                            )
+                        )
+                    )
+
+
+            return rook_nfsgw
+
+        create_ganesha_pool(mgr)
+        NFSRados(mgr_module.rados, service_id).write_obj('', f'conf-nfs.{spec.service_id}')
+        return self._create_or_patch(cnfs.CephNFS, 'cephnfses', service_id,
+                _update_nfs, _create_nfs)
+
+    def rm_service(self, rooktype: str, service_id: str) -> str:
+        self.customObjects_api.delete_namespaced_custom_object(group="ceph.rook.io", version="v1", namespace="rook-ceph", plural=rooktype, name=service_id)
         objpath = "{0}/{1}".format(rooktype, service_id)
         objpath = "{0}/{1}".format(rooktype, service_id)
+        return f'Removed {objpath}'
 
 
-        try:
-            self.rook_api_delete(objpath)
-        except ApiException as e:
-            if e.status == 404:
-                log.info("{0} service '{1}' does not exist".format(service_type, service_id))
-                # Idempotent, succeed.
-            else:
-                raise
+    def get_resource(self, resource_type: str) -> Iterable:
+        custom_objects: KubernetesCustomResource = KubernetesCustomResource(self.customObjects_api.list_namespaced_custom_object, group="ceph.rook.io", version="v1", namespace="rook-ceph", plural=resource_type)
+        return custom_objects.items
 
 
-    def can_create_osd(self):
+    def can_create_osd(self) -> bool:
         current_cluster = self.rook_api_get(
             "cephclusters/{0}".format(self.rook_env.cluster_name))
         use_all_nodes = current_cluster['spec'].get('useAllNodes', False)
         current_cluster = self.rook_api_get(
             "cephclusters/{0}".format(self.rook_env.cluster_name))
         use_all_nodes = current_cluster['spec'].get('useAllNodes', False)
@@ -314,129 +1132,418 @@ class RookCluster(object):
         # to anything we put in 'nodes', so can't do OSD creation.
         return not use_all_nodes
 
         # to anything we put in 'nodes', so can't do OSD creation.
         return not use_all_nodes
 
-    def node_exists(self, node_name):
-        try:
-            self.k8s.read_node(node_name, exact=False, export=True)
-        except ApiException as e:
-            if e.status == 404:
-                return False
-            else:
-                raise
+    def node_exists(self, node_name: str) -> bool:
+        return node_name in self.get_node_names()
+
+    def update_mon_count(self, newcount: Optional[int]) -> str:
+        def _update_mon_count(current, new):
+            # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
+            if newcount is None:
+                raise orchestrator.OrchestratorError('unable to set mon count to None')
+            if not new.spec.mon:
+                raise orchestrator.OrchestratorError("mon attribute not specified in new spec")
+            new.spec.mon.count = newcount
+            return new
+        return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count)
+
+    def add_osds(self, drive_group, matching_hosts):
+        # type: (DriveGroupSpec, List[str]) -> str
+        assert drive_group.objectstore in ("bluestore", "filestore")
+        assert drive_group.service_id
+        storage_class = self.get_storage_class()
+        inventory = self.get_discovered_devices()
+        creator: Optional[DefaultCreator] = None
+        if (
+            storage_class.metadata.labels
+            and 'local.storage.openshift.io/owner-name' in storage_class.metadata.labels
+        ):
+            creator = LSOCreator(inventory, self.coreV1_api, self.storage_class)    
         else:
         else:
-            return True
+            creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class)
+        return self._patch(
+            ccl.CephCluster,
+            'cephclusters',
+            self.rook_env.cluster_name,
+            creator.add_osds(self.rook_pods, drive_group, matching_hosts)
+        )
+
+    def remove_osds(self, osd_ids: List[str], replace: bool, force: bool, mon_command: Callable) -> str:
+        inventory = self.get_discovered_devices()
+        self.remover = DefaultRemover(
+            self.coreV1_api,
+            self.batchV1_api, 
+            self.appsV1_api, 
+            osd_ids, 
+            replace, 
+            force, 
+            mon_command, 
+            self._patch, 
+            self.rook_env,
+            inventory
+        )
+        return self.remover.remove()
+
+    def get_hosts(self) -> List[orchestrator.HostSpec]:
+        ret = []
+        for node in self.nodes.items:
+            spec = orchestrator.HostSpec(
+                node.metadata.name, 
+                addr='/'.join([addr.address for addr in node.status.addresses]), 
+                labels=[label.split('/')[1] for label in node.metadata.labels if label.startswith('ceph-label')],
+            )
+            ret.append(spec)
+        return ret
+
+    def create_zap_job(self, host: str, path: str) -> None:
+        body = client.V1Job(
+            api_version="batch/v1",
+            metadata=client.V1ObjectMeta(
+                name="rook-ceph-device-zap",
+                namespace="rook-ceph"
+            ),
+            spec=client.V1JobSpec(
+                template=client.V1PodTemplateSpec(
+                    spec=client.V1PodSpec(
+                        containers=[
+                            client.V1Container(
+                                name="device-zap",
+                                image="rook/ceph:master",
+                                command=["bash"],
+                                args=["-c", f"ceph-volume raw list {path} && dd if=/dev/zero of=\"{path}\" bs=1M count=1 oflag=direct,dsync || ceph-volume lvm zap --destroy {path}"],
+                                env=[
+                                    client.V1EnvVar(
+                                        name="ROOK_CEPH_USERNAME",
+                                        value_from=client.V1EnvVarSource(
+                                            secret_key_ref=client.V1SecretKeySelector(
+                                                key="ceph-username",
+                                                name="rook-ceph-mon"
+                                            )
+                                        )
+                                    ),
+                                    client.V1EnvVar(
+                                        name="ROOK_CEPH_SECRET",
+                                        value_from=client.V1EnvVarSource(
+                                            secret_key_ref=client.V1SecretKeySelector(
+                                                key="ceph-secret",
+                                                name="rook-ceph-mon"
+                                            )
+                                        )
+                                    )
+                                ],
+                                security_context=client.V1SecurityContext(
+                                    run_as_user=0,
+                                    privileged=True
+                                ),
+                                volume_mounts=[
+                                    client.V1VolumeMount(
+                                        mount_path="/etc/ceph",
+                                        name="ceph-conf-emptydir"
+                                    ),
+                                    client.V1VolumeMount(
+                                        mount_path="/etc/rook",
+                                        name="rook-config"
+                                    ),
+                                    client.V1VolumeMount(
+                                        mount_path="/dev",
+                                        name="devices"
+                                    )
+                                ]
+                            )
+                        ],
+                        volumes=[
+                            client.V1Volume(
+                                name="ceph-conf-emptydir",
+                                empty_dir=client.V1EmptyDirVolumeSource()
+                            ),
+                            client.V1Volume(
+                                name="rook-config",
+                                empty_dir=client.V1EmptyDirVolumeSource()
+                            ),
+                            client.V1Volume(
+                                name="devices",
+                                host_path=client.V1HostPathVolumeSource(
+                                    path="/dev"
+                                )
+                            ),
+                        ],
+                        node_selector={
+                            "kubernetes.io/hostname": host
+                        },
+                        restart_policy="Never"
+                    )
+                )
+            )
+        )
+        self.batchV1_api.create_namespaced_job('rook-ceph', body)
+
+    def rbd_mirror(self, spec: ServiceSpec) -> None:
+        service_id = spec.service_id or "default-rbd-mirror"
+        all_hosts = self.get_hosts()
+        def _create_rbd_mirror() -> crbdm.CephRBDMirror:
+            return crbdm.CephRBDMirror(
+                apiVersion=self.rook_env.api_name,
+                metadata=dict(
+                    name=service_id,
+                    namespace=self.rook_env.namespace,
+                ),
+                spec=crbdm.Spec(
+                    count=spec.placement.count or 1,
+                    placement=crbdm.Placement(
+                        nodeAffinity=crbdm.NodeAffinity(
+                            requiredDuringSchedulingIgnoredDuringExecution=crbdm.RequiredDuringSchedulingIgnoredDuringExecution(
+                                nodeSelectorTerms=crbdm.NodeSelectorTermsList(
+                                    [
+                                        placement_spec_to_node_selector(spec.placement, all_hosts)
+                                    ]
+                                )
+                            )
+                        )
+                    )
+                )
+            )
+        def _update_rbd_mirror(new: crbdm.CephRBDMirror) -> crbdm.CephRBDMirror:
+            new.spec.count = spec.placement.count or 1
+            new.spec.placement = crbdm.Placement(
+                nodeAffinity=crbdm.NodeAffinity(
+                    requiredDuringSchedulingIgnoredDuringExecution=crbdm.RequiredDuringSchedulingIgnoredDuringExecution(
+                        nodeSelectorTerms=crbdm.NodeSelectorTermsList(
+                            [
+                                placement_spec_to_node_selector(spec.placement, all_hosts)
+                            ]
+                        )
+                    )
+                )
+            )
+            return new
+        self._create_or_patch(crbdm.CephRBDMirror, 'cephrbdmirrors', service_id, _update_rbd_mirror, _create_rbd_mirror)
+    def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str:
+        current_json = self.rook_api_get(
+            "{}/{}".format(crd_name, cr_name)
+        )
+
+        current = crd.from_json(current_json)
+        new = crd.from_json(current_json)  # no deepcopy.
+
+        new = func(current, new)
+
+        patch = list(jsonpatch.make_patch(current_json, new.to_json()))
+
+        log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch))
 
 
-    def update_mon_count(self, newcount):
-        patch = [{"op": "replace", "path": "/spec/mon/count", "value": newcount}]
+        if len(patch) == 0:
+            return "No change"
 
         try:
             self.rook_api_patch(
 
         try:
             self.rook_api_patch(
-                "cephclusters/{0}".format(self.rook_env.cluster_name),
+                "{}/{}".format(crd_name, cr_name),
                 body=patch)
         except ApiException as e:
             log.exception("API exception: {0}".format(e))
             raise ApplyException(
                 body=patch)
         except ApiException as e:
             log.exception("API exception: {0}".format(e))
             raise ApplyException(
-                "Failed to update mon count in Cluster CRD: {0}".format(e))
-
-        return "Updated mon count to {0}".format(newcount)
+                "Failed to update {}/{}: {}".format(crd_name, cr_name, e))
 
 
-    def update_nfs_count(self, svc_id, newcount):
-        patch = [{"op": "replace", "path": "/spec/server/active", "value": newcount}]
+        return "Success"
 
 
+    def _create_or_patch(self,
+                         crd: Type,
+                         crd_name: str,
+                         cr_name: str,
+                         update_func: Callable[[CrdClassT], CrdClassT],
+                         create_func: Callable[[], CrdClassT]) -> str:
         try:
         try:
-            self.rook_api_patch(
-                "cephnfses/{0}".format(svc_id),
-                body=patch)
+            current_json = self.rook_api_get(
+                "{}/{}".format(crd_name, cr_name)
+            )
         except ApiException as e:
         except ApiException as e:
-            log.exception("API exception: {0}".format(e))
-            raise ApplyException(
-                "Failed to update NFS server count for {0}: {1}".format(svc_id, e))
-        return "Updated NFS server count for {0} to {1}".format(svc_id, newcount)
-
-    def add_osds(self, drive_group, all_hosts):
-        # type: (orchestrator.DriveGroupSpec, List[str]) -> str
-        """
-        Rook currently (0.8) can only do single-drive OSDs, so we
-        treat all drive groups as just a list of individual OSDs.
-        """
-        block_devices = drive_group.data_devices.paths if drive_group.data_devices else None
-        directories = drive_group.data_directories
-
-        assert drive_group.objectstore in ("bluestore", "filestore")
-
-        # The CRD looks something like this:
-        #     nodes:
-        #       - name: "gravel1.rockery"
-        #         devices:
-        #          - name: "sdb"
-        #         config:
-        #           storeType: bluestore
-
-        current_cluster = self.rook_api_get(
-            "cephclusters/{0}".format(self.rook_env.cluster_name))
-
-        patch = []
+            if e.status == 404:
+                current_json = None
+            else:
+                raise
 
 
-        # FIXME: this is all not really atomic, because jsonpatch doesn't
-        # let us do "test" operations that would check if items with
-        # matching names were in existing lists.
+        if current_json:
+            new = crd.from_json(current_json)  # no deepcopy.
 
 
-        if 'nodes' not in current_cluster['spec']['storage']:
-            patch.append({
-                'op': 'add', 'path': '/spec/storage/nodes', 'value': []
-            })
+            new = update_func(new)
 
 
-        current_nodes = current_cluster['spec']['storage'].get('nodes', [])
+            patch = list(jsonpatch.make_patch(current_json, new.to_json()))
 
 
-        if drive_group.hosts(all_hosts)[0] not in [n['name'] for n in current_nodes]:
-            pd = { "name": drive_group.hosts(all_hosts)[0],
-                   "config": { "storeType": drive_group.objectstore }}
+            log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch))
 
 
-            if block_devices:
-                pd["devices"] = [{'name': d} for d in block_devices]
-            if directories:
-                pd["directories"] = [{'path': p} for p in directories]
+            if len(patch) == 0:
+                return "No change"
 
 
-            patch.append({ "op": "add", "path": "/spec/storage/nodes/-", "value": pd })
+            try:
+                self.rook_api_patch(
+                    "{}/{}".format(crd_name, cr_name),
+                    body=patch)
+            except ApiException as e:
+                log.exception("API exception: {0}".format(e))
+                raise ApplyException(
+                    "Failed to update {}/{}: {}".format(crd_name, cr_name, e))
+            return "Updated"
         else:
         else:
-            # Extend existing node
-            node_idx = None
-            current_node = None
-            for i, c in enumerate(current_nodes):
-                if c['name'] == drive_group.hosts(all_hosts)[0]:
-                    current_node = c
-                    node_idx = i
-                    break
-
-            assert node_idx is not None
-            assert current_node is not None
-
-            new_devices = list(set(block_devices) - set([d['name'] for d in current_node['devices']]))
-            for n in new_devices:
-                patch.append({
-                    "op": "add",
-                    "path": "/spec/storage/nodes/{0}/devices/-".format(node_idx),
-                    "value": {'name': n}
-                })
-
-            new_dirs = list(set(directories) - set(current_node['directories']))
-            for p in new_dirs:
-                patch.append({
-                    "op": "add",
-                    "path": "/spec/storage/nodes/{0}/directories/-".format(node_idx),
-                    "value": {'path': p}
-                })
+            new = create_func()
+            with self.ignore_409("{} {} already exists".format(crd_name,
+                                                               cr_name)):
+                self.rook_api_post("{}/".format(crd_name),
+                                   body=new.to_json())
+            return "Created"
+    def get_ceph_image(self) -> str:
+        try:
+            api_response = self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
+                                                               label_selector="app=rook-ceph-mon",
+                                                               timeout_seconds=10)
+            if api_response.items:
+                return api_response.items[-1].spec.containers[0].image
+            else:
+                raise orchestrator.OrchestratorError(
+                        "Error getting ceph image. Cluster without monitors")
+        except ApiException as e:
+            raise orchestrator.OrchestratorError("Error getting ceph image: {}".format(e))
+
+
+    def _execute_blight_job(self, ident_fault: str, on: bool, loc: orchestrator.DeviceLightLoc) -> str:
+        operation_id = str(hash(loc))
+        message = ""
+
+        # job definition
+        job_metadata = client.V1ObjectMeta(name=operation_id,
+                                           namespace= self.rook_env.namespace,
+                                           labels={"ident": operation_id})
+        pod_metadata = client.V1ObjectMeta(labels={"ident": operation_id})
+        pod_container = client.V1Container(name="ceph-lsmcli-command",
+                                           security_context=client.V1SecurityContext(privileged=True),
+                                           image=self.get_ceph_image(),
+                                           command=["lsmcli",],
+                                           args=['local-disk-%s-led-%s' % (ident_fault,'on' if on else 'off'),
+                                                 '--path', loc.path or loc.dev,],
+                                           volume_mounts=[client.V1VolumeMount(name="devices", mount_path="/dev"),
+                                                          client.V1VolumeMount(name="run-udev", mount_path="/run/udev")])
+        pod_spec = client.V1PodSpec(containers=[pod_container],
+                                    active_deadline_seconds=30, # Max time to terminate pod
+                                    restart_policy="Never",
+                                    node_selector= {"kubernetes.io/hostname": loc.host},
+                                    volumes=[client.V1Volume(name="devices",
+                                                             host_path=client.V1HostPathVolumeSource(path="/dev")),
+                                             client.V1Volume(name="run-udev",
+                                                             host_path=client.V1HostPathVolumeSource(path="/run/udev"))])
+        pod_template = client.V1PodTemplateSpec(metadata=pod_metadata,
+                                                  spec=pod_spec)
+        job_spec = client.V1JobSpec(active_deadline_seconds=60, # Max time to terminate job
+                                    ttl_seconds_after_finished=10, # Alfa. Lifetime after finishing (either Complete or Failed)
+                                    backoff_limit=0,
+                                    template=pod_template)
+        job = client.V1Job(api_version="batch/v1",
+                           kind="Job",
+                           metadata=job_metadata,
+                           spec=job_spec)
+
+        # delete previous job if it exists
+        try:
+            try:
+                api_response = self.batchV1_api.delete_namespaced_job(operation_id,
+                                                                      self.rook_env.namespace,
+                                                                      propagation_policy="Background")
+            except ApiException as e:
+                if e.status != 404: # No problem if the job does not exist
+                    raise
+
+            # wait until the job is not present
+            deleted = False
+            retries = 0
+            while not deleted and retries < 10:
+                api_response = self.batchV1_api.list_namespaced_job(self.rook_env.namespace,
+                                                                    label_selector="ident=%s" % operation_id,
+                                                                    timeout_seconds=10)
+                deleted = not api_response.items
+                if retries > 5:
+                    sleep(0.1)
+                retries += 1
+            if retries == 10 and not deleted:
+                raise orchestrator.OrchestratorError(
+                    "Light <{}> in <{}:{}> cannot be executed. Cannot delete previous job <{}>".format(
+                            on, loc.host, loc.path or loc.dev, operation_id))
+
+            # create the job
+            api_response = self.batchV1_api.create_namespaced_job(self.rook_env.namespace, job)
+
+            # get the result
+            finished = False
+            while not finished:
+                api_response = self.batchV1_api.read_namespaced_job(operation_id,
+                                                                    self.rook_env.namespace)
+                finished = api_response.status.succeeded or api_response.status.failed
+                if finished:
+                    message = api_response.status.conditions[-1].message
+
+            # get the result of the lsmcli command
+            api_response=self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
+                                                             label_selector="ident=%s" % operation_id,
+                                                             timeout_seconds=10)
+            if api_response.items:
+                pod_name = api_response.items[-1].metadata.name
+                message = self.coreV1_api.read_namespaced_pod_log(pod_name,
+                                                                  self.rook_env.namespace)
 
 
-        if len(patch) == 0:
-            return "No change"
+        except ApiException as e:
+            log.exception('K8s API failed. {}'.format(e))
+            raise
 
 
+        # Finally, delete the job.
+        # The job uses <ttl_seconds_after_finished>. This makes that the TTL controller delete automatically the job.
+        # This feature is in Alpha state, so extra explicit delete operations trying to delete the Job has been used strategically
         try:
         try:
-            self.rook_api_patch(
-                "cephclusters/{0}".format(self.rook_env.cluster_name),
-                body=patch)
+            api_response = self.batchV1_api.delete_namespaced_job(operation_id,
+                                                                  self.rook_env.namespace,
+                                                                  propagation_policy="Background")
         except ApiException as e:
         except ApiException as e:
-            log.exception("API exception: {0}".format(e))
-            raise ApplyException(
-                "Failed to create OSD entries in Cluster CRD: {0}".format(
-                    e))
+            if e.status != 404: # No problem if the job does not exist
+                raise
 
 
-        return "Success"
+        return message
+
+    def blink_light(self, ident_fault, on, locs):
+        # type: (str, bool, List[orchestrator.DeviceLightLoc]) -> List[str]
+        return [self._execute_blight_job(ident_fault, on, loc) for loc in locs]
+
+def placement_spec_to_node_selector(spec: PlacementSpec, all_hosts: List) -> ccl.NodeSelectorTermsItem:
+    all_hostnames = [hs.hostname for hs in all_hosts]
+    res = ccl.NodeSelectorTermsItem(matchExpressions=ccl.MatchExpressionsList())
+    if spec.host_pattern and spec.host_pattern != "*":
+        raise RuntimeError("The Rook orchestrator only supports a host_pattern of * for placements")
+    if spec.label:
+        res.matchExpressions.append(
+            ccl.MatchExpressionsItem(
+                key="ceph-label/" + spec.label,
+                operator="Exists"
+            )
+        )
+    if spec.hosts:
+        host_list = [h.hostname for h in spec.hosts if h.hostname in all_hostnames]
+        res.matchExpressions.append(
+            ccl.MatchExpressionsItem(
+                key="kubernetes.io/hostname",
+                operator="In",
+                values=ccl.CrdObjectList(host_list)
+            )
+        ) 
+    if spec.host_pattern == "*" or (not spec.label and not spec.hosts and not spec.host_pattern):
+        res.matchExpressions.append(
+            ccl.MatchExpressionsItem(
+                key="kubernetes.io/hostname",
+                operator="Exists",
+            )
+        )
+    return res
+    
+def node_selector_to_placement_spec(node_selector: ccl.NodeSelectorTermsItem) -> PlacementSpec:
+    res = PlacementSpec()
+    for expression in node_selector.matchExpressions:
+        if expression.key.startswith("ceph-label/"):
+            res.label = expression.key.split('/')[1]
+        elif expression.key == "kubernetes.io/hostname":
+            if expression.operator == "Exists":
+                res.host_pattern = "*"
+            elif expression.operator == "In": 
+                res.hosts = [HostPlacementSpec(hostname=value, network='', name='')for value in expression.values]
+    return res