This module is runnable outside of ceph-mgr, useful for testing.
"""
+import datetime
+import threading
import logging
-import json
from contextlib import contextmanager
+from time import sleep
+import re
+from orchestrator import OrchResult
-from six.moves.urllib.parse import urljoin # pylint: disable=import-error
+import jsonpatch
+from urllib.parse import urljoin
+import json
# Optional kubernetes imports to enable MgrModule.can_run
# to behave cleanly.
+from urllib3.exceptions import ProtocolError
+
+from ceph.deployment.inventory import Device
+from ceph.deployment.drive_group import DriveGroupSpec
+from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec, HostPlacementSpec
+from ceph.utils import datetime_now
+from ceph.deployment.drive_selection.matchers import SizeMatcher
+from nfs.cluster import create_ganesha_pool
+from nfs.module import Module
+from nfs.export import NFSRados
+from mgr_module import NFS_POOL_NAME
+from mgr_util import merge_dicts
+
+from typing import Optional, Tuple, TypeVar, List, Callable, Any, cast, Generic, \
+ Iterable, Dict, Iterator, Type
+
try:
+ from kubernetes import client, watch
from kubernetes.client.rest import ApiException
except ImportError:
- ApiException = None
+ class ApiException(Exception): # type: ignore
+ status = 0
+
+from .rook_client.ceph import cephfilesystem as cfs
+from .rook_client.ceph import cephnfs as cnfs
+from .rook_client.ceph import cephobjectstore as cos
+from .rook_client.ceph import cephcluster as ccl
+from .rook_client.ceph import cephrbdmirror as crbdm
+from .rook_client._helper import CrdClass
+
+import orchestrator
try:
- import orchestrator
- from rook.module import RookEnv
- from typing import List
+ from rook.module import RookEnv, RookOrchestrator
except ImportError:
pass # just used for type checking.
+T = TypeVar('T')
+FuncT = TypeVar('FuncT', bound=Callable)
+
+CrdClassT = TypeVar('CrdClassT', bound=CrdClass)
+
+
log = logging.getLogger(__name__)
-class ApplyException(Exception):
+def __urllib3_supports_read_chunked() -> bool:
+ # There is a bug in CentOS 7 as it ships a urllib3 which is lower
+ # than required by kubernetes-client
+ try:
+ from urllib3.response import HTTPResponse
+ return hasattr(HTTPResponse, 'read_chunked')
+ except ImportError:
+ return False
+
+
+_urllib3_supports_read_chunked = __urllib3_supports_read_chunked()
+
+class ApplyException(orchestrator.OrchestratorError):
"""
For failures to update the Rook CRDs, usually indicating
some kind of interference between our attempted update
"""
+def threaded(f: Callable[..., None]) -> Callable[..., threading.Thread]:
+ def wrapper(*args: Any, **kwargs: Any) -> threading.Thread:
+ t = threading.Thread(target=f, args=args, kwargs=kwargs)
+ t.start()
+ return t
+
+ return cast(Callable[..., threading.Thread], wrapper)
+
+
+class DefaultFetcher():
+ def __init__(self, storage_class: str, coreV1_api: 'client.CoreV1Api'):
+ self.storage_class = storage_class
+ self.coreV1_api = coreV1_api
+
+ def fetch(self) -> None:
+ self.inventory: KubernetesResource[client.V1PersistentVolumeList] = KubernetesResource(self.coreV1_api.list_persistent_volume)
+ self.pvs_in_sc = [i for i in self.inventory.items if i.spec.storage_class_name == self.storage_class]
+
+ def convert_size(self, size_str: str) -> int:
+ units = ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "", "K", "M", "G", "T", "P", "E")
+ coeff_and_unit = re.search('(\d+)(\D+)', size_str)
+ assert coeff_and_unit is not None
+ coeff = int(coeff_and_unit[1])
+ unit = coeff_and_unit[2]
+ try:
+ factor = units.index(unit) % 7
+ except ValueError:
+ log.error("PV size format invalid")
+ raise
+ size = coeff * (2 ** (10 * factor))
+ return size
+
+ def devices(self) -> Dict[str, List[Device]]:
+ nodename_to_devices: Dict[str, List[Device]] = {}
+ for i in self.pvs_in_sc:
+ node, device = self.device(i)
+ if node not in nodename_to_devices:
+ nodename_to_devices[node] = []
+ nodename_to_devices[node].append(device)
+ return nodename_to_devices
+
+ def device(self, i: 'client.V1PersistentVolume') -> Tuple[str, Device]:
+ node = 'N/A'
+ if i.spec.node_affinity:
+ terms = i.spec.node_affinity.required.node_selector_terms
+ if len(terms) == 1 and len(terms[0].match_expressions) == 1 and terms[0].match_expressions[0].key == 'kubernetes.io/hostname' and len(terms[0].match_expressions[0].values) == 1:
+ node = terms[0].match_expressions[0].values[0]
+ size = self.convert_size(i.spec.capacity['storage'])
+ path = i.spec.host_path.path if i.spec.host_path else i.spec.local.path if i.spec.local else ('/dev/' + i.metadata.annotations['storage.openshift.com/device-name']) if i.metadata.annotations and 'storage.openshift.com/device-name' in i.metadata.annotations else ''
+ state = i.spec.volume_mode == 'Block' and i.status.phase == 'Available'
+ pv_name = i.metadata.name
+ device = Device(
+ path = path,
+ sys_api = dict(
+ size = size,
+ node = node,
+ pv_name = pv_name
+ ),
+ available = state,
+ )
+ return (node, device)
+
+
+class LSOFetcher(DefaultFetcher):
+ def __init__(self, storage_class: 'str', coreV1_api: 'client.CoreV1Api', customObjects_api: 'client.CustomObjectsApi', nodenames: 'Optional[List[str]]' = None):
+ super().__init__(storage_class, coreV1_api)
+ self.customObjects_api = customObjects_api
+ self.nodenames = nodenames
+
+ def fetch(self) -> None:
+ super().fetch()
+ self.discovery: KubernetesCustomResource = KubernetesCustomResource(self.customObjects_api.list_cluster_custom_object,
+ group="local.storage.openshift.io",
+ version="v1alpha1",
+ plural="localvolumediscoveryresults")
+
+ def predicate(self, item: 'client.V1ConfigMapList') -> bool:
+ if self.nodenames is not None:
+ return item['spec']['nodeName'] in self.nodenames
+ else:
+ return True
+
+ def devices(self) -> Dict[str, List[Device]]:
+ try:
+ lso_discovery_results = [i for i in self.discovery.items if self.predicate(i)]
+ except ApiException as dummy_e:
+ log.error("Failed to fetch device metadata")
+ raise
+ self.lso_devices = {}
+ for i in lso_discovery_results:
+ drives = i['status']['discoveredDevices']
+ for drive in drives:
+ self.lso_devices[drive['deviceID'].split('/')[-1]] = drive
+ nodename_to_devices: Dict[str, List[Device]] = {}
+ for i in self.pvs_in_sc:
+ node, device = (None, None)
+ if (not i.metadata.annotations) or ('storage.openshift.com/device-id' not in i.metadata.annotations) or (i.metadata.annotations['storage.openshift.com/device-id'] not in self.lso_devices):
+ node, device = super().device(i)
+ else:
+ node, device = self.device(i)
+ if node not in nodename_to_devices:
+ nodename_to_devices[node] = []
+ nodename_to_devices[node].append(device)
+ return nodename_to_devices
+
+ def device(self, i: Any) -> Tuple[str, Device]:
+ node = i.metadata.labels['kubernetes.io/hostname']
+ device_discovery = self.lso_devices[i.metadata.annotations['storage.openshift.com/device-id']]
+ pv_name = i.metadata.name
+ vendor: str = device_discovery['model'].split()[0] if len(device_discovery['model'].split()) >= 1 else ''
+ model: str = ' '.join(device_discovery['model'].split()[1:]) if len(device_discovery['model'].split()) > 1 else ''
+ device = Device(
+ path = device_discovery['path'],
+ sys_api = dict(
+ size = device_discovery['size'],
+ rotational = '1' if device_discovery['property']=='Rotational' else '0',
+ node = node,
+ pv_name = pv_name,
+ model = model,
+ vendor = vendor
+ ),
+ available = device_discovery['status']['state']=='Available',
+ device_id = device_discovery['deviceID'].split('/')[-1],
+ lsm_data = dict(
+ serialNum = device_discovery['serial']
+ )
+ )
+ return (node, device)
+
+
+class PDFetcher(DefaultFetcher):
+ """ Physical Devices Fetcher"""
+ def __init__(self, coreV1_api: 'client.CoreV1Api'):
+ self.coreV1_api = coreV1_api
+
+ def fetch(self) -> None:
+ """ Collect the devices information from k8s configmaps"""
+ self.dev_cms: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
+ namespace='rook-ceph',
+ label_selector='app=rook-discover')
+
+ def devices(self) -> Dict[str, List[Device]]:
+ """ Return the list of devices found"""
+ node_devices: Dict[str, List[Device]] = {}
+ for i in self.dev_cms.items:
+ devices_list: List[Device] = []
+ for d in json.loads(i.data['devices']):
+ devices_list.append(self.device(d)[1])
+ node_devices[i.metadata.labels['rook.io/node']] = devices_list
+
+ return node_devices
+
+ def device(self, devData: Dict[str,str]) -> Tuple[str, Device]:
+ """ Build an orchestrator device """
+ if 'cephVolumeData' in devData and devData['cephVolumeData']:
+ return "", Device.from_json(json.loads(devData['cephVolumeData']))
+ else:
+ return "", Device(
+ path='/dev/' + devData['name'],
+ sys_api=dict(
+ rotational='1' if devData['rotational'] else '0',
+ size=devData['size']
+ ),
+ available=False,
+ rejected_reasons=['device data coming from ceph-volume not provided'],
+ )
+
+
+class KubernetesResource(Generic[T]):
+ def __init__(self, api_func: Callable, **kwargs: Any) -> None:
+ """
+ Generic kubernetes Resource parent class
+
+ The api fetch and watch methods should be common across resource types,
+
+ Exceptions in the runner thread are propagated to the caller.
+
+ :param api_func: kubernetes client api function that is passed to the watcher
+ :param filter_func: signature: ``(Item) -> bool``.
+ """
+ self.kwargs = kwargs
+ self.api_func = api_func
+
+ # ``_items`` is accessed by different threads. I assume assignment is atomic.
+ self._items: Dict[str, T] = dict()
+ self.thread = None # type: Optional[threading.Thread]
+ self.exception: Optional[Exception] = None
+ if not _urllib3_supports_read_chunked:
+ logging.info('urllib3 is too old. Fallback to full fetches')
+
+ def _fetch(self) -> str:
+ """ Execute the requested api method as a one-off fetch"""
+ response = self.api_func(**self.kwargs)
+ metadata = response.metadata
+ self._items = {item.metadata.name: item for item in response.items}
+ log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
+ return metadata.resource_version
+
+ @property
+ def items(self) -> Iterable[T]:
+ """
+ Returns the items of the request.
+ Creates the watcher as a side effect.
+ :return:
+ """
+ if self.exception:
+ e = self.exception
+ self.exception = None
+ raise e # Propagate the exception to the user.
+ if not self.thread or not self.thread.is_alive():
+ resource_version = self._fetch()
+ if _urllib3_supports_read_chunked:
+ # Start a thread which will use the kubernetes watch client against a resource
+ log.debug("Attaching resource watcher for k8s {}".format(self.api_func))
+ self.thread = self._watch(resource_version)
+
+ return self._items.values()
+
+ def get_item_name(self, item: Any) -> Any:
+ try:
+ return item.metadata.name
+ except AttributeError:
+ raise AttributeError(
+ "{} doesn't contain a metadata.name. Unable to track changes".format(
+ self.api_func))
+ @threaded
+ def _watch(self, res_ver: Optional[str]) -> None:
+ """ worker thread that runs the kubernetes watch """
+
+ self.exception = None
+
+ w = watch.Watch()
+
+ try:
+ # execute generator to continually watch resource for changes
+ for event in w.stream(self.api_func, resource_version=res_ver, watch=True,
+ **self.kwargs):
+ self.health = ''
+ item = event['object']
+ name = self.get_item_name(item)
+
+ log.info('{} event: {}'.format(event['type'], name))
+
+ if event['type'] in ('ADDED', 'MODIFIED'):
+ self._items = merge_dicts(self._items, {name: item})
+ elif event['type'] == 'DELETED':
+ self._items = {k:v for k,v in self._items.items() if k != name}
+ elif event['type'] == 'BOOKMARK':
+ pass
+ elif event['type'] == 'ERROR':
+ raise ApiException(str(event))
+ else:
+ raise KeyError('Unknown watch event {}'.format(event['type']))
+ except ProtocolError as e:
+ if 'Connection broken' in str(e):
+ log.info('Connection reset.')
+ return
+ raise
+ except ApiException as e:
+ log.exception('K8s API failed. {}'.format(self.api_func))
+ self.exception = e
+ raise
+ except Exception as e:
+ log.exception("Watcher failed. ({})".format(self.api_func))
+ self.exception = e
+ raise
+
+class KubernetesCustomResource(KubernetesResource):
+ def _fetch(self) -> str:
+ response = self.api_func(**self.kwargs)
+ metadata = response['metadata']
+ self._items = {item['metadata']['name']: item for item in response['items']}
+ log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
+ return metadata['resourceVersion']
+
+ def get_item_name(self, item: Any) -> Any:
+ try:
+ return item['metadata']['name']
+ except AttributeError:
+ raise AttributeError(
+ "{} doesn't contain a metadata.name. Unable to track changes".format(
+ self.api_func))
+
+class DefaultCreator():
+ def __init__(self, inventory: 'Dict[str, List[Device]]', coreV1_api: 'client.CoreV1Api', storage_class: 'str'):
+ self.coreV1_api = coreV1_api
+ self.storage_class = storage_class
+ self.inventory = inventory
+
+ def device_to_device_set(self, drive_group: DriveGroupSpec, d: Device) -> ccl.StorageClassDeviceSetsItem:
+ device_set = ccl.StorageClassDeviceSetsItem(
+ name=d.sys_api['pv_name'],
+ volumeClaimTemplates= ccl.VolumeClaimTemplatesList(),
+ count=1,
+ encrypted=drive_group.encrypted,
+ portable=False
+ )
+ device_set.volumeClaimTemplates.append(
+ ccl.VolumeClaimTemplatesItem(
+ metadata=ccl.Metadata(
+ name="data"
+ ),
+ spec=ccl.Spec(
+ storageClassName=self.storage_class,
+ volumeMode="Block",
+ accessModes=ccl.CrdObjectList(["ReadWriteOnce"]),
+ resources={
+ "requests":{
+ "storage": 1
+ }
+ },
+ volumeName=d.sys_api['pv_name']
+ )
+ )
+ )
+ return device_set
+
+ def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
+ device_list = []
+ assert drive_group.data_devices is not None
+ sizematcher: Optional[SizeMatcher] = None
+ if drive_group.data_devices.size:
+ sizematcher = SizeMatcher('size', drive_group.data_devices.size)
+ limit = getattr(drive_group.data_devices, 'limit', None)
+ count = 0
+ all = getattr(drive_group.data_devices, 'all', None)
+ paths = [device.path for device in drive_group.data_devices.paths]
+ osd_list = []
+ for pod in rook_pods.items:
+ if (
+ hasattr(pod, 'metadata')
+ and hasattr(pod.metadata, 'labels')
+ and 'osd' in pod.metadata.labels
+ and 'ceph.rook.io/DeviceSet' in pod.metadata.labels
+ ):
+ osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
+ for _, node in self.inventory.items():
+ for device in node:
+ if device.sys_api['pv_name'] in osd_list:
+ count += 1
+ for _, node in self.inventory.items():
+ for device in node:
+ if not limit or (count < limit):
+ if device.available:
+ if (
+ all
+ or (
+ device.sys_api['node'] in matching_hosts
+ and ((sizematcher != None) or sizematcher.compare(device))
+ and (
+ not drive_group.data_devices.paths
+ or (device.path in paths)
+ )
+ )
+ ):
+ device_list.append(device)
+ count += 1
+
+ return device_list
+
+ def add_osds(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> Any:
+ to_create = self.filter_devices(rook_pods, drive_group,matching_hosts)
+ assert drive_group.data_devices is not None
+ def _add_osds(current_cluster, new_cluster):
+ # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
+ if not hasattr(new_cluster.spec, 'storage') or not new_cluster.spec.storage:
+ new_cluster.spec.storage = ccl.Storage()
+
+ if not hasattr(new_cluster.spec.storage, 'storageClassDeviceSets') or not new_cluster.spec.storage.storageClassDeviceSets:
+ new_cluster.spec.storage.storageClassDeviceSets = ccl.StorageClassDeviceSetsList()
+
+ existing_scds = [
+ scds.name for scds in new_cluster.spec.storage.storageClassDeviceSets
+ ]
+ for device in to_create:
+ new_scds = self.device_to_device_set(drive_group, device)
+ if new_scds.name not in existing_scds:
+ new_cluster.spec.storage.storageClassDeviceSets.append(new_scds)
+ return new_cluster
+ return _add_osds
+
+class LSOCreator(DefaultCreator):
+ def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
+ device_list = []
+ assert drive_group.data_devices is not None
+ sizematcher = None
+ if drive_group.data_devices.size:
+ sizematcher = SizeMatcher('size', drive_group.data_devices.size)
+ limit = getattr(drive_group.data_devices, 'limit', None)
+ all = getattr(drive_group.data_devices, 'all', None)
+ paths = [device.path for device in drive_group.data_devices.paths]
+ vendor = getattr(drive_group.data_devices, 'vendor', None)
+ model = getattr(drive_group.data_devices, 'model', None)
+ count = 0
+ osd_list = []
+ for pod in rook_pods.items:
+ if (
+ hasattr(pod, 'metadata')
+ and hasattr(pod.metadata, 'labels')
+ and 'osd' in pod.metadata.labels
+ and 'ceph.rook.io/DeviceSet' in pod.metadata.labels
+ ):
+ osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
+ for _, node in self.inventory.items():
+ for device in node:
+ if device.sys_api['pv_name'] in osd_list:
+ count += 1
+ for _, node in self.inventory.items():
+ for device in node:
+ if not limit or (count < limit):
+ if device.available:
+ if (
+ all
+ or (
+ device.sys_api['node'] in matching_hosts
+ and ((sizematcher != None) or sizematcher.compare(device))
+ and (
+ not drive_group.data_devices.paths
+ or device.path in paths
+ )
+ and (
+ not vendor
+ or device.sys_api['vendor'] == vendor
+ )
+ and (
+ not model
+ or device.sys_api['model'].startsWith(model)
+ )
+ )
+ ):
+ device_list.append(device)
+ count += 1
+ return device_list
+
+class DefaultRemover():
+ def __init__(
+ self,
+ coreV1_api: 'client.CoreV1Api',
+ batchV1_api: 'client.BatchV1Api',
+ appsV1_api: 'client.AppsV1Api',
+ osd_ids: List[str],
+ replace_flag: bool,
+ force_flag: bool,
+ mon_command: Callable,
+ patch: Callable,
+ rook_env: 'RookEnv',
+ inventory: Dict[str, List[Device]]
+ ):
+ self.batchV1_api = batchV1_api
+ self.appsV1_api = appsV1_api
+ self.coreV1_api = coreV1_api
+
+ self.osd_ids = osd_ids
+ self.replace_flag = replace_flag
+ self.force_flag = force_flag
+
+ self.mon_command = mon_command
+
+ self.patch = patch
+ self.rook_env = rook_env
+
+ self.inventory = inventory
+ self.osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace='rook-ceph', label_selector='app=rook-ceph-osd')
+ self.jobs: KubernetesResource = KubernetesResource(self.batchV1_api.list_namespaced_job, namespace='rook-ceph', label_selector='app=rook-ceph-osd-prepare')
+ self.pvcs: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_persistent_volume_claim, namespace='rook-ceph')
+
+
+ def remove_device_sets(self) -> str:
+ self.to_remove: Dict[str, int] = {}
+ self.pvc_to_remove: List[str] = []
+ for pod in self.osd_pods.items:
+ if (
+ hasattr(pod, 'metadata')
+ and hasattr(pod.metadata, 'labels')
+ and 'osd' in pod.metadata.labels
+ and pod.metadata.labels['osd'] in self.osd_ids
+ ):
+ if pod.metadata.labels['ceph.rook.io/DeviceSet'] in self.to_remove:
+ self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] + 1
+ else:
+ self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = 1
+ self.pvc_to_remove.append(pod.metadata.labels['ceph.rook.io/pvc'])
+ def _remove_osds(current_cluster, new_cluster):
+ # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
+ assert new_cluster.spec.storage is not None and new_cluster.spec.storage.storageClassDeviceSets is not None
+ for _set in new_cluster.spec.storage.storageClassDeviceSets:
+ if _set.name in self.to_remove:
+ if _set.count == self.to_remove[_set.name]:
+ new_cluster.spec.storage.storageClassDeviceSets.remove(_set)
+ else:
+ _set.count = _set.count - self.to_remove[_set.name]
+ return new_cluster
+ return self.patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _remove_osds)
+
+ def check_force(self) -> None:
+ if not self.force_flag:
+ safe_args = {'prefix': 'osd safe-to-destroy',
+ 'ids': [str(x) for x in self.osd_ids]}
+ ret, out, err = self.mon_command(safe_args)
+ if ret != 0:
+ raise RuntimeError(err)
+
+ def set_osds_down(self) -> None:
+ down_flag_args = {
+ 'prefix': 'osd down',
+ 'ids': [str(x) for x in self.osd_ids]
+ }
+ ret, out, err = self.mon_command(down_flag_args)
+ if ret != 0:
+ raise RuntimeError(err)
+
+ def scale_deployments(self) -> None:
+ for osd_id in self.osd_ids:
+ self.appsV1_api.patch_namespaced_deployment_scale(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), body=client.V1Scale(
+ spec=client.V1ScaleSpec(
+ replicas=0
+ )
+ ))
+
+ def set_osds_out(self) -> None:
+ out_flag_args = {
+ 'prefix': 'osd out',
+ 'ids': [str(x) for x in self.osd_ids]
+ }
+ ret, out, err = self.mon_command(out_flag_args)
+ if ret != 0:
+ raise RuntimeError(err)
+
+ def delete_deployments(self) -> None:
+ for osd_id in self.osd_ids:
+ self.appsV1_api.delete_namespaced_deployment(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), propagation_policy='Foreground')
+
+ def clean_up_prepare_jobs_and_pvc(self) -> None:
+ for job in self.jobs.items:
+ if job.metadata.labels['ceph.rook.io/pvc'] in self.pvc_to_remove:
+ self.batchV1_api.delete_namespaced_job(name=job.metadata.name, namespace='rook-ceph', propagation_policy='Foreground')
+ self.coreV1_api.delete_namespaced_persistent_volume_claim(name=job.metadata.labels['ceph.rook.io/pvc'], namespace='rook-ceph', propagation_policy='Foreground')
+
+ def purge_osds(self) -> None:
+ for id in self.osd_ids:
+ purge_args = {
+ 'prefix': 'osd purge-actual',
+ 'id': int(id),
+ 'yes_i_really_mean_it': True
+ }
+ ret, out, err = self.mon_command(purge_args)
+ if ret != 0:
+ raise RuntimeError(err)
+
+ def destroy_osds(self) -> None:
+ for id in self.osd_ids:
+ destroy_args = {
+ 'prefix': 'osd destroy-actual',
+ 'id': int(id),
+ 'yes_i_really_mean_it': True
+ }
+ ret, out, err = self.mon_command(destroy_args)
+ if ret != 0:
+ raise RuntimeError(err)
+
+ def remove(self) -> str:
+ try:
+ self.check_force()
+ except Exception as e:
+ log.exception("Error checking if OSDs are safe to destroy")
+ return f"OSDs not safe to destroy or unable to check if they are safe to destroy: {e}"
+ try:
+ remove_result = self.remove_device_sets()
+ except Exception as e:
+ log.exception("Error patching ceph cluster CRD")
+ return f"Not possible to modify Ceph cluster CRD: {e}"
+ try:
+ self.scale_deployments()
+ self.delete_deployments()
+ self.clean_up_prepare_jobs_and_pvc()
+ except Exception as e:
+ log.exception("Ceph cluster CRD patched, but error cleaning environment")
+ return f"Error cleaning environment after removing OSDs from Ceph cluster CRD: {e}"
+ try:
+ self.set_osds_down()
+ self.set_osds_out()
+ if self.replace_flag:
+ self.destroy_osds()
+ else:
+ self.purge_osds()
+ except Exception as e:
+ log.exception("OSDs removed from environment, but not able to remove OSDs from Ceph cluster")
+ return f"Error removing OSDs from Ceph cluster: {e}"
+
+ return remove_result
+
+
+
class RookCluster(object):
- def __init__(self, k8s, rook_env):
+ # import of client.CoreV1Api must be optional at import time.
+ # Instead allow mgr/rook to be imported anyway.
+ def __init__(
+ self,
+ coreV1_api: 'client.CoreV1Api',
+ batchV1_api: 'client.BatchV1Api',
+ customObjects_api: 'client.CustomObjectsApi',
+ storageV1_api: 'client.StorageV1Api',
+ appsV1_api: 'client.AppsV1Api',
+ rook_env: 'RookEnv',
+ storage_class: 'str'
+ ):
self.rook_env = rook_env # type: RookEnv
- self.k8s = k8s
-
- def rook_url(self, path):
+ self.coreV1_api = coreV1_api # client.CoreV1Api
+ self.batchV1_api = batchV1_api
+ self.customObjects_api = customObjects_api
+ self.storageV1_api = storageV1_api # client.StorageV1Api
+ self.appsV1_api = appsV1_api # client.AppsV1Api
+ self.storage_class = storage_class # type: str
+
+ # TODO: replace direct k8s calls with Rook API calls
+ self.storage_classes : KubernetesResource = KubernetesResource(self.storageV1_api.list_storage_class)
+
+ self.rook_pods: KubernetesResource[client.V1Pod] = KubernetesResource(self.coreV1_api.list_namespaced_pod,
+ namespace=self.rook_env.namespace,
+ label_selector="rook_cluster={0}".format(
+ self.rook_env.namespace))
+ self.nodes: KubernetesResource[client.V1Node] = KubernetesResource(self.coreV1_api.list_node)
+
+ def rook_url(self, path: str) -> str:
prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
self.rook_env.crd_version, self.rook_env.namespace)
return urljoin(prefix, path)
- def rook_api_call(self, verb, path, **kwargs):
+ def rook_api_call(self, verb: str, path: str, **kwargs: Any) -> Any:
full_path = self.rook_url(path)
log.debug("[%s] %s" % (verb, full_path))
- return self.k8s.api_client.call_api(
+ return self.coreV1_api.api_client.call_api(
full_path,
verb,
auth_settings=['BearerToken'],
_preload_content=True,
**kwargs)
- def rook_api_get(self, path, **kwargs):
+ def rook_api_get(self, path: str, **kwargs: Any) -> Any:
return self.rook_api_call("GET", path, **kwargs)
- def rook_api_delete(self, path):
+ def rook_api_delete(self, path: str) -> Any:
return self.rook_api_call("DELETE", path)
- def rook_api_patch(self, path, **kwargs):
+ def rook_api_patch(self, path: str, **kwargs: Any) -> Any:
return self.rook_api_call("PATCH", path,
header_params={"Content-Type": "application/json-patch+json"},
**kwargs)
- def rook_api_post(self, path, **kwargs):
+ def rook_api_post(self, path: str, **kwargs: Any) -> Any:
return self.rook_api_call("POST", path, **kwargs)
- def get_discovered_devices(self, nodenames=None):
- # TODO: replace direct k8s calls with Rook API calls
- # when they're implemented
- label_selector = "app=rook-discover"
- if nodenames is not None:
- # FIXME: is there a practical or official limit on the
- # number of entries in a label selector
- label_selector += ", rook.io/node in ({0})".format(
- ", ".join(nodenames))
-
- try:
- result = self.k8s.list_namespaced_config_map(
- self.rook_env.operator_namespace,
- label_selector=label_selector)
- except ApiException as e:
- log.exception("Failed to fetch device metadata: {0}".format(e))
- raise
-
- nodename_to_devices = {}
- for i in result.items:
- drives = json.loads(i.data['devices'])
- nodename_to_devices[i.metadata.labels['rook.io/node']] = drives
+ def get_storage_class(self) -> 'client.V1StorageClass':
+ matching_sc = [i for i in self.storage_classes.items if self.storage_class == i.metadata.name]
+ if len(matching_sc) == 0:
+ log.error(f"No storage class exists matching configured Rook orchestrator storage class which currently is <{self.storage_class}>. This storage class can be set in ceph config (mgr/rook/storage_class)")
+ raise Exception('No storage class exists matching name provided in ceph config at mgr/rook/storage_class')
+ return matching_sc[0]
+
+ def get_discovered_devices(self, nodenames: Optional[List[str]] = None) -> Dict[str, List[Device]]:
+ self.fetcher: Optional[DefaultFetcher] = None
+ op_settings = self.coreV1_api.read_namespaced_config_map(name="rook-ceph-operator-config", namespace='rook-ceph').data
+ if op_settings.get('ROOK_ENABLE_DISCOVERY_DAEMON', 'false').lower() == 'true':
+ self.fetcher = PDFetcher(self.coreV1_api)
+ else:
+ storage_class = self.get_storage_class()
+ if storage_class.metadata.labels and ('local.storage.openshift.io/owner-name' in storage_class.metadata.labels):
+ self.fetcher = LSOFetcher(self.storage_class, self.coreV1_api, self.customObjects_api, nodenames)
+ else:
+ self.fetcher = DefaultFetcher(self.storage_class, self.coreV1_api)
- return nodename_to_devices
+ self.fetcher.fetch()
+ return self.fetcher.devices()
- def get_nfs_conf_url(self, nfs_cluster, instance):
+ def get_osds(self) -> List:
+ osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace='rook-ceph', label_selector='app=rook-ceph-osd')
+ return list(osd_pods.items)
+
+ def get_nfs_conf_url(self, nfs_cluster: str, instance: str) -> Optional[str]:
#
# Fetch cephnfs object for "nfs_cluster" and then return a rados://
# URL for the instance within that cluster. If the fetch fails, just
url = "rados://{0}/{1}/conf-{2}.{3}".format(pool, namespace, nfs_cluster, instance)
return url
+ def describe_pods(self,
+ service_type: Optional[str],
+ service_id: Optional[str],
+ nodename: Optional[str]) -> List[Dict[str, Any]]:
+ """
+ Go query the k8s API about deployment, containers related to this
+ filesystem
- def describe_pods(self, service_type, service_id, nodename):
- # Go query the k8s API about deployment, containers related to this
- # filesystem
-
- # Inspect the Rook YAML, to decide whether this filesystem
- # is Ceph-managed or Rook-managed
- # TODO: extend Orchestrator interface to describe whether FS
- # is manageable by us or not
-
- # Example Rook Pod labels for a mgr daemon:
- # Labels: app=rook-ceph-mgr
- # pod-template-hash=2171958073
- # rook_cluster=rook
- # And MDS containers additionally have `rook_filesystem` label
-
- # Label filter is rook_cluster=<cluster name>
- # rook_file_system=<self.fs_name>
-
- label_filter = "rook_cluster={0}".format(self.rook_env.cluster_name)
- if service_type != None:
- label_filter += ",app=rook-ceph-{0}".format(service_type)
- if service_id != None:
- if service_type == "mds":
- label_filter += ",rook_file_system={0}".format(service_id)
- elif service_type == "osd":
- # Label added in https://github.com/rook/rook/pull/1698
- label_filter += ",ceph-osd-id={0}".format(service_id)
- elif service_type == "mon":
- # label like mon=rook-ceph-mon0
- label_filter += ",mon={0}".format(service_id)
- elif service_type == "mgr":
- label_filter += ",mgr={0}".format(service_id)
- elif service_type == "nfs":
- label_filter += ",ceph_nfs={0}".format(service_id)
- elif service_type == "rgw":
- # TODO: rgw
- pass
-
- field_filter = ""
- if nodename != None:
- field_filter = "spec.nodeName={0}".format(nodename)
+ Example Rook Pod labels for a mgr daemon:
+ Labels: app=rook-ceph-mgr
+ pod-template-hash=2171958073
+ rook_cluster=rook
+ And MDS containers additionally have `rook_filesystem` label
- pods = self.k8s.list_namespaced_pod(
- self.rook_env.namespace,
- label_selector=label_filter,
- field_selector=field_filter)
+ Label filter is rook_cluster=<cluster namespace>
+ rook_file_system=<self.fs_name>
+ """
+ def predicate(item):
+ # type: (client.V1Pod) -> bool
+ metadata = item.metadata
+ if service_type is not None:
+ if metadata.labels['app'] != "rook-ceph-{0}".format(service_type):
+ return False
+
+ if service_id is not None:
+ try:
+ k, v = {
+ "mds": ("rook_file_system", service_id),
+ "osd": ("ceph-osd-id", service_id),
+ "mon": ("mon", service_id),
+ "mgr": ("mgr", service_id),
+ "nfs": ("nfs", service_id),
+ "rgw": ("ceph_rgw", service_id),
+ }[service_type]
+ except KeyError:
+ raise orchestrator.OrchestratorValidationError(
+ '{} not supported'.format(service_type))
+ if metadata.labels[k] != v:
+ return False
+
+ if nodename is not None:
+ if item.spec.node_name != nodename:
+ return False
+ return True
- # import json
- # print json.dumps(pods.items[0])
+ refreshed = datetime_now()
+ pods = [i for i in self.rook_pods.items if predicate(i)]
pods_summary = []
+ prefix = 'sha256:'
- for p in pods.items:
+ for p in pods:
d = p.to_dict()
- # p['metadata']['creationTimestamp']
- # p['metadata']['nodeName']
- pods_summary.append({
+
+ image_name = None
+ for c in d['spec']['containers']:
+ # look at the first listed container in the pod...
+ image_name = c['image']
+ break
+
+ ls = d['status'].get('container_statuses')
+ if not ls:
+ # ignore pods with no containers
+ continue
+ image_id = ls[0]['image_id']
+ image_id = image_id.split(prefix)[1] if prefix in image_id else image_id
+
+ s = {
"name": d['metadata']['name'],
- "nodename": d['spec']['node_name'],
- "labels": d['metadata']['labels']
- })
- pass
+ "hostname": d['spec']['node_name'],
+ "labels": d['metadata']['labels'],
+ 'phase': d['status']['phase'],
+ 'container_image_name': image_name,
+ 'container_image_id': image_id,
+ 'refreshed': refreshed,
+ # these may get set below...
+ 'started': None,
+ 'created': None,
+ }
+
+ # note: we want UTC
+ if d['metadata'].get('creation_timestamp', None):
+ s['created'] = d['metadata']['creation_timestamp'].astimezone(
+ tz=datetime.timezone.utc)
+ if d['status'].get('start_time', None):
+ s['started'] = d['status']['start_time'].astimezone(
+ tz=datetime.timezone.utc)
+
+ pods_summary.append(s)
return pods_summary
+ def remove_pods(self, names: List[str]) -> List[str]:
+ pods = [i for i in self.rook_pods.items]
+ for p in pods:
+ d = p.to_dict()
+ daemon_type = d['metadata']['labels']['app'].replace('rook-ceph-','')
+ daemon_id = d['metadata']['labels']['ceph_daemon_id']
+ name = daemon_type + '.' + daemon_id
+ if name in names:
+ self.coreV1_api.delete_namespaced_pod(
+ d['metadata']['name'],
+ self.rook_env.namespace,
+ body=client.V1DeleteOptions()
+ )
+ return [f'Removed Pod {n}' for n in names]
+
+ def get_node_names(self) -> List[str]:
+ return [i.metadata.name for i in self.nodes.items]
+
@contextmanager
- def ignore_409(self, what):
+ def ignore_409(self, what: str) -> Iterator[None]:
try:
yield
except ApiException as e:
else:
raise
- def add_filesystem(self, spec):
+ def apply_filesystem(self, spec: ServiceSpec, num_replicas: int,
+ leaf_type: str) -> str:
# TODO use spec.placement
# TODO warn if spec.extended has entries we don't kow how
# to action.
-
- rook_fs = {
- "apiVersion": self.rook_env.api_name,
- "kind": "CephFilesystem",
- "metadata": {
- "name": spec.name,
- "namespace": self.rook_env.namespace
- },
- "spec": {
- "onlyManageDaemons": True,
- "metadataServer": {
- "activeCount": spec.count,
- "activeStandby": True
-
- }
- }
- }
-
- with self.ignore_409("CephFilesystem '{0}' already exists".format(spec.name)):
- self.rook_api_post("cephfilesystems/", body=rook_fs)
-
- def add_nfsgw(self, spec):
+ all_hosts = self.get_hosts()
+ def _update_fs(new: cfs.CephFilesystem) -> cfs.CephFilesystem:
+ new.spec.metadataServer.activeCount = spec.placement.count or 1
+ new.spec.metadataServer.placement = cfs.Placement(
+ nodeAffinity=cfs.NodeAffinity(
+ requiredDuringSchedulingIgnoredDuringExecution=cfs.RequiredDuringSchedulingIgnoredDuringExecution(
+ nodeSelectorTerms=cfs.NodeSelectorTermsList(
+ [placement_spec_to_node_selector(spec.placement, all_hosts)]
+ )
+ )
+ )
+ )
+ return new
+ def _create_fs() -> cfs.CephFilesystem:
+ fs = cfs.CephFilesystem(
+ apiVersion=self.rook_env.api_name,
+ metadata=dict(
+ name=spec.service_id,
+ namespace=self.rook_env.namespace,
+ ),
+ spec=cfs.Spec(
+ dataPools=cfs.DataPoolsList(
+ {
+ cfs.DataPoolsItem(
+ failureDomain=leaf_type,
+ replicated=cfs.Replicated(
+ size=num_replicas
+ )
+ )
+ }
+ ),
+ metadataPool=cfs.MetadataPool(
+ failureDomain=leaf_type,
+ replicated=cfs.Replicated(
+ size=num_replicas
+ )
+ ),
+ metadataServer=cfs.MetadataServer(
+ activeCount=spec.placement.count or 1,
+ activeStandby=True,
+ placement=
+ cfs.Placement(
+ nodeAffinity=cfs.NodeAffinity(
+ requiredDuringSchedulingIgnoredDuringExecution=cfs.RequiredDuringSchedulingIgnoredDuringExecution(
+ nodeSelectorTerms=cfs.NodeSelectorTermsList(
+ [placement_spec_to_node_selector(spec.placement, all_hosts)]
+ )
+ )
+ )
+ )
+ )
+ )
+ )
+ return fs
+ assert spec.service_id is not None
+ return self._create_or_patch(
+ cfs.CephFilesystem, 'cephfilesystems', spec.service_id,
+ _update_fs, _create_fs)
+
+ def get_matching_node(self, host: str) -> Any:
+ matching_node = None
+ for node in self.nodes.items:
+ if node.metadata.labels['kubernetes.io/hostname'] == host:
+ matching_node = node
+ return matching_node
+
+ def add_host_label(self, host: str, label: str) -> OrchResult[str]:
+ matching_node = self.get_matching_node(host)
+ if matching_node == None:
+ return OrchResult(None, RuntimeError(f"Cannot add {label} label to {host}: host not found in cluster"))
+ matching_node.metadata.labels['ceph-label/'+ label] = ""
+ self.coreV1_api.patch_node(host, matching_node)
+ return OrchResult(f'Added {label} label to {host}')
+
+ def remove_host_label(self, host: str, label: str) -> OrchResult[str]:
+ matching_node = self.get_matching_node(host)
+ if matching_node == None:
+ return OrchResult(None, RuntimeError(f"Cannot remove {label} label from {host}: host not found in cluster"))
+ matching_node.metadata.labels.pop('ceph-label/' + label, None)
+ self.coreV1_api.patch_node(host, matching_node)
+ return OrchResult(f'Removed {label} label from {host}')
+
+ def apply_objectstore(self, spec: RGWSpec, num_replicas: int, leaf_type: str) -> str:
+ assert spec.service_id is not None
+
+ name = spec.service_id
+
+ if '.' in spec.service_id:
+ # rook does not like . in the name. this is could
+ # there because it is a legacy rgw spec that was named
+ # like $realm.$zone, except that I doubt there were any
+ # users of this code. Instead, focus on future users and
+ # translate . to - (fingers crossed!) instead.
+ name = spec.service_id.replace('.', '-')
+
+ all_hosts = self.get_hosts()
+ def _create_zone() -> cos.CephObjectStore:
+ port = None
+ secure_port = None
+ if spec.ssl:
+ secure_port = spec.get_port()
+ else:
+ port = spec.get_port()
+ object_store = cos.CephObjectStore(
+ apiVersion=self.rook_env.api_name,
+ metadata=dict(
+ name=name,
+ namespace=self.rook_env.namespace
+ ),
+ spec=cos.Spec(
+ gateway=cos.Gateway(
+ port=port,
+ securePort=secure_port,
+ instances=spec.placement.count or 1,
+ placement=cos.Placement(
+ cos.NodeAffinity(
+ requiredDuringSchedulingIgnoredDuringExecution=cos.RequiredDuringSchedulingIgnoredDuringExecution(
+ nodeSelectorTerms=cos.NodeSelectorTermsList(
+ [
+ placement_spec_to_node_selector(spec.placement, all_hosts)
+ ]
+ )
+ )
+ )
+ )
+ ),
+ dataPool=cos.DataPool(
+ failureDomain=leaf_type,
+ replicated=cos.Replicated(
+ size=num_replicas
+ )
+ ),
+ metadataPool=cos.MetadataPool(
+ failureDomain=leaf_type,
+ replicated=cos.Replicated(
+ size=num_replicas
+ )
+ )
+ )
+ )
+ if spec.rgw_zone:
+ object_store.spec.zone=cos.Zone(
+ name=spec.rgw_zone
+ )
+ return object_store
+
+
+ def _update_zone(new: cos.CephObjectStore) -> cos.CephObjectStore:
+ if new.spec.gateway:
+ new.spec.gateway.instances = spec.placement.count or 1
+ else:
+ new.spec.gateway=cos.Gateway(
+ instances=spec.placement.count or 1
+ )
+ return new
+ return self._create_or_patch(
+ cos.CephObjectStore, 'cephobjectstores', name,
+ _update_zone, _create_zone)
+
+ def apply_nfsgw(self, spec: NFSServiceSpec, mgr: 'RookOrchestrator') -> str:
# TODO use spec.placement
# TODO warn if spec.extended has entries we don't kow how
# to action.
-
- rook_nfsgw = {
- "apiVersion": self.rook_env.api_name,
- "kind": "CephNFS",
- "metadata": {
- "name": spec.name,
- "namespace": self.rook_env.namespace
- },
- "spec": {
- "rados": {
- "pool": spec.extended["pool"]
- },
- "server": {
- "active": spec.count,
- }
- }
- }
-
- if "namespace" in spec.extended:
- rook_nfsgw["spec"]["rados"]["namespace"] = spec.extended["namespace"]
-
- with self.ignore_409("NFS cluster '{0}' already exists".format(spec.name)):
- self.rook_api_post("cephnfses/", body=rook_nfsgw)
-
- def add_objectstore(self, spec):
- rook_os = {
- "apiVersion": self.rook_env.api_name,
- "kind": "CephObjectStore",
- "metadata": {
- "name": spec.name,
- "namespace": self.rook_env.namespace
- },
- "spec": {
- "metadataPool": {
- "failureDomain": "host",
- "replicated": {
- "size": 1
- }
- },
- "dataPool": {
- "failureDomain": "osd",
- "replicated": {
- "size": 1
- }
- },
- "gateway": {
- "type": "s3",
- "port": 80,
- "instances": 1,
- "allNodes": False
- }
- }
- }
-
- with self.ignore_409("CephObjectStore '{0}' already exists".format(spec.name)):
- self.rook_api_post("cephobjectstores/", body=rook_os)
-
- def rm_service(self, service_type, service_id):
- assert service_type in ("mds", "rgw", "nfs")
-
- if service_type == "mds":
- rooktype = "cephfilesystems"
- elif service_type == "rgw":
- rooktype = "cephobjectstores"
- elif service_type == "nfs":
- rooktype = "cephnfses"
-
+ # TODO Number of pods should be based on the list of hosts in the
+ # PlacementSpec.
+ assert spec.service_id, "service id in NFS service spec cannot be an empty string or None " # for mypy typing
+ service_id = spec.service_id
+ mgr_module = cast(Module, mgr)
+ count = spec.placement.count or 1
+ def _update_nfs(new: cnfs.CephNFS) -> cnfs.CephNFS:
+ new.spec.server.active = count
+ return new
+
+ def _create_nfs() -> cnfs.CephNFS:
+ rook_nfsgw = cnfs.CephNFS(
+ apiVersion=self.rook_env.api_name,
+ metadata=dict(
+ name=spec.service_id,
+ namespace=self.rook_env.namespace,
+ ),
+ spec=cnfs.Spec(
+ rados=cnfs.Rados(
+ namespace=service_id,
+ pool=NFS_POOL_NAME,
+ ),
+ server=cnfs.Server(
+ active=count
+ )
+ )
+ )
+
+
+ return rook_nfsgw
+
+ create_ganesha_pool(mgr)
+ NFSRados(mgr_module.rados, service_id).write_obj('', f'conf-nfs.{spec.service_id}')
+ return self._create_or_patch(cnfs.CephNFS, 'cephnfses', service_id,
+ _update_nfs, _create_nfs)
+
+ def rm_service(self, rooktype: str, service_id: str) -> str:
+ self.customObjects_api.delete_namespaced_custom_object(group="ceph.rook.io", version="v1", namespace="rook-ceph", plural=rooktype, name=service_id)
objpath = "{0}/{1}".format(rooktype, service_id)
+ return f'Removed {objpath}'
- try:
- self.rook_api_delete(objpath)
- except ApiException as e:
- if e.status == 404:
- log.info("{0} service '{1}' does not exist".format(service_type, service_id))
- # Idempotent, succeed.
- else:
- raise
+ def get_resource(self, resource_type: str) -> Iterable:
+ custom_objects: KubernetesCustomResource = KubernetesCustomResource(self.customObjects_api.list_namespaced_custom_object, group="ceph.rook.io", version="v1", namespace="rook-ceph", plural=resource_type)
+ return custom_objects.items
- def can_create_osd(self):
+ def can_create_osd(self) -> bool:
current_cluster = self.rook_api_get(
"cephclusters/{0}".format(self.rook_env.cluster_name))
use_all_nodes = current_cluster['spec'].get('useAllNodes', False)
# to anything we put in 'nodes', so can't do OSD creation.
return not use_all_nodes
- def node_exists(self, node_name):
- try:
- self.k8s.read_node(node_name, exact=False, export=True)
- except ApiException as e:
- if e.status == 404:
- return False
- else:
- raise
+ def node_exists(self, node_name: str) -> bool:
+ return node_name in self.get_node_names()
+
+ def update_mon_count(self, newcount: Optional[int]) -> str:
+ def _update_mon_count(current, new):
+ # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
+ if newcount is None:
+ raise orchestrator.OrchestratorError('unable to set mon count to None')
+ if not new.spec.mon:
+ raise orchestrator.OrchestratorError("mon attribute not specified in new spec")
+ new.spec.mon.count = newcount
+ return new
+ return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count)
+
+ def add_osds(self, drive_group, matching_hosts):
+ # type: (DriveGroupSpec, List[str]) -> str
+ assert drive_group.objectstore in ("bluestore", "filestore")
+ assert drive_group.service_id
+ storage_class = self.get_storage_class()
+ inventory = self.get_discovered_devices()
+ creator: Optional[DefaultCreator] = None
+ if (
+ storage_class.metadata.labels
+ and 'local.storage.openshift.io/owner-name' in storage_class.metadata.labels
+ ):
+ creator = LSOCreator(inventory, self.coreV1_api, self.storage_class)
else:
- return True
+ creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class)
+ return self._patch(
+ ccl.CephCluster,
+ 'cephclusters',
+ self.rook_env.cluster_name,
+ creator.add_osds(self.rook_pods, drive_group, matching_hosts)
+ )
+
+ def remove_osds(self, osd_ids: List[str], replace: bool, force: bool, mon_command: Callable) -> str:
+ inventory = self.get_discovered_devices()
+ self.remover = DefaultRemover(
+ self.coreV1_api,
+ self.batchV1_api,
+ self.appsV1_api,
+ osd_ids,
+ replace,
+ force,
+ mon_command,
+ self._patch,
+ self.rook_env,
+ inventory
+ )
+ return self.remover.remove()
+
+ def get_hosts(self) -> List[orchestrator.HostSpec]:
+ ret = []
+ for node in self.nodes.items:
+ spec = orchestrator.HostSpec(
+ node.metadata.name,
+ addr='/'.join([addr.address for addr in node.status.addresses]),
+ labels=[label.split('/')[1] for label in node.metadata.labels if label.startswith('ceph-label')],
+ )
+ ret.append(spec)
+ return ret
+
+ def create_zap_job(self, host: str, path: str) -> None:
+ body = client.V1Job(
+ api_version="batch/v1",
+ metadata=client.V1ObjectMeta(
+ name="rook-ceph-device-zap",
+ namespace="rook-ceph"
+ ),
+ spec=client.V1JobSpec(
+ template=client.V1PodTemplateSpec(
+ spec=client.V1PodSpec(
+ containers=[
+ client.V1Container(
+ name="device-zap",
+ image="rook/ceph:master",
+ command=["bash"],
+ args=["-c", f"ceph-volume raw list {path} && dd if=/dev/zero of=\"{path}\" bs=1M count=1 oflag=direct,dsync || ceph-volume lvm zap --destroy {path}"],
+ env=[
+ client.V1EnvVar(
+ name="ROOK_CEPH_USERNAME",
+ value_from=client.V1EnvVarSource(
+ secret_key_ref=client.V1SecretKeySelector(
+ key="ceph-username",
+ name="rook-ceph-mon"
+ )
+ )
+ ),
+ client.V1EnvVar(
+ name="ROOK_CEPH_SECRET",
+ value_from=client.V1EnvVarSource(
+ secret_key_ref=client.V1SecretKeySelector(
+ key="ceph-secret",
+ name="rook-ceph-mon"
+ )
+ )
+ )
+ ],
+ security_context=client.V1SecurityContext(
+ run_as_user=0,
+ privileged=True
+ ),
+ volume_mounts=[
+ client.V1VolumeMount(
+ mount_path="/etc/ceph",
+ name="ceph-conf-emptydir"
+ ),
+ client.V1VolumeMount(
+ mount_path="/etc/rook",
+ name="rook-config"
+ ),
+ client.V1VolumeMount(
+ mount_path="/dev",
+ name="devices"
+ )
+ ]
+ )
+ ],
+ volumes=[
+ client.V1Volume(
+ name="ceph-conf-emptydir",
+ empty_dir=client.V1EmptyDirVolumeSource()
+ ),
+ client.V1Volume(
+ name="rook-config",
+ empty_dir=client.V1EmptyDirVolumeSource()
+ ),
+ client.V1Volume(
+ name="devices",
+ host_path=client.V1HostPathVolumeSource(
+ path="/dev"
+ )
+ ),
+ ],
+ node_selector={
+ "kubernetes.io/hostname": host
+ },
+ restart_policy="Never"
+ )
+ )
+ )
+ )
+ self.batchV1_api.create_namespaced_job('rook-ceph', body)
+
+ def rbd_mirror(self, spec: ServiceSpec) -> None:
+ service_id = spec.service_id or "default-rbd-mirror"
+ all_hosts = self.get_hosts()
+ def _create_rbd_mirror() -> crbdm.CephRBDMirror:
+ return crbdm.CephRBDMirror(
+ apiVersion=self.rook_env.api_name,
+ metadata=dict(
+ name=service_id,
+ namespace=self.rook_env.namespace,
+ ),
+ spec=crbdm.Spec(
+ count=spec.placement.count or 1,
+ placement=crbdm.Placement(
+ nodeAffinity=crbdm.NodeAffinity(
+ requiredDuringSchedulingIgnoredDuringExecution=crbdm.RequiredDuringSchedulingIgnoredDuringExecution(
+ nodeSelectorTerms=crbdm.NodeSelectorTermsList(
+ [
+ placement_spec_to_node_selector(spec.placement, all_hosts)
+ ]
+ )
+ )
+ )
+ )
+ )
+ )
+ def _update_rbd_mirror(new: crbdm.CephRBDMirror) -> crbdm.CephRBDMirror:
+ new.spec.count = spec.placement.count or 1
+ new.spec.placement = crbdm.Placement(
+ nodeAffinity=crbdm.NodeAffinity(
+ requiredDuringSchedulingIgnoredDuringExecution=crbdm.RequiredDuringSchedulingIgnoredDuringExecution(
+ nodeSelectorTerms=crbdm.NodeSelectorTermsList(
+ [
+ placement_spec_to_node_selector(spec.placement, all_hosts)
+ ]
+ )
+ )
+ )
+ )
+ return new
+ self._create_or_patch(crbdm.CephRBDMirror, 'cephrbdmirrors', service_id, _update_rbd_mirror, _create_rbd_mirror)
+ def _patch(self, crd: Type, crd_name: str, cr_name: str, func: Callable[[CrdClassT, CrdClassT], CrdClassT]) -> str:
+ current_json = self.rook_api_get(
+ "{}/{}".format(crd_name, cr_name)
+ )
+
+ current = crd.from_json(current_json)
+ new = crd.from_json(current_json) # no deepcopy.
+
+ new = func(current, new)
+
+ patch = list(jsonpatch.make_patch(current_json, new.to_json()))
+
+ log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch))
- def update_mon_count(self, newcount):
- patch = [{"op": "replace", "path": "/spec/mon/count", "value": newcount}]
+ if len(patch) == 0:
+ return "No change"
try:
self.rook_api_patch(
- "cephclusters/{0}".format(self.rook_env.cluster_name),
+ "{}/{}".format(crd_name, cr_name),
body=patch)
except ApiException as e:
log.exception("API exception: {0}".format(e))
raise ApplyException(
- "Failed to update mon count in Cluster CRD: {0}".format(e))
-
- return "Updated mon count to {0}".format(newcount)
+ "Failed to update {}/{}: {}".format(crd_name, cr_name, e))
- def update_nfs_count(self, svc_id, newcount):
- patch = [{"op": "replace", "path": "/spec/server/active", "value": newcount}]
+ return "Success"
+ def _create_or_patch(self,
+ crd: Type,
+ crd_name: str,
+ cr_name: str,
+ update_func: Callable[[CrdClassT], CrdClassT],
+ create_func: Callable[[], CrdClassT]) -> str:
try:
- self.rook_api_patch(
- "cephnfses/{0}".format(svc_id),
- body=patch)
+ current_json = self.rook_api_get(
+ "{}/{}".format(crd_name, cr_name)
+ )
except ApiException as e:
- log.exception("API exception: {0}".format(e))
- raise ApplyException(
- "Failed to update NFS server count for {0}: {1}".format(svc_id, e))
- return "Updated NFS server count for {0} to {1}".format(svc_id, newcount)
-
- def add_osds(self, drive_group, all_hosts):
- # type: (orchestrator.DriveGroupSpec, List[str]) -> str
- """
- Rook currently (0.8) can only do single-drive OSDs, so we
- treat all drive groups as just a list of individual OSDs.
- """
- block_devices = drive_group.data_devices.paths if drive_group.data_devices else None
- directories = drive_group.data_directories
-
- assert drive_group.objectstore in ("bluestore", "filestore")
-
- # The CRD looks something like this:
- # nodes:
- # - name: "gravel1.rockery"
- # devices:
- # - name: "sdb"
- # config:
- # storeType: bluestore
-
- current_cluster = self.rook_api_get(
- "cephclusters/{0}".format(self.rook_env.cluster_name))
-
- patch = []
+ if e.status == 404:
+ current_json = None
+ else:
+ raise
- # FIXME: this is all not really atomic, because jsonpatch doesn't
- # let us do "test" operations that would check if items with
- # matching names were in existing lists.
+ if current_json:
+ new = crd.from_json(current_json) # no deepcopy.
- if 'nodes' not in current_cluster['spec']['storage']:
- patch.append({
- 'op': 'add', 'path': '/spec/storage/nodes', 'value': []
- })
+ new = update_func(new)
- current_nodes = current_cluster['spec']['storage'].get('nodes', [])
+ patch = list(jsonpatch.make_patch(current_json, new.to_json()))
- if drive_group.hosts(all_hosts)[0] not in [n['name'] for n in current_nodes]:
- pd = { "name": drive_group.hosts(all_hosts)[0],
- "config": { "storeType": drive_group.objectstore }}
+ log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch))
- if block_devices:
- pd["devices"] = [{'name': d} for d in block_devices]
- if directories:
- pd["directories"] = [{'path': p} for p in directories]
+ if len(patch) == 0:
+ return "No change"
- patch.append({ "op": "add", "path": "/spec/storage/nodes/-", "value": pd })
+ try:
+ self.rook_api_patch(
+ "{}/{}".format(crd_name, cr_name),
+ body=patch)
+ except ApiException as e:
+ log.exception("API exception: {0}".format(e))
+ raise ApplyException(
+ "Failed to update {}/{}: {}".format(crd_name, cr_name, e))
+ return "Updated"
else:
- # Extend existing node
- node_idx = None
- current_node = None
- for i, c in enumerate(current_nodes):
- if c['name'] == drive_group.hosts(all_hosts)[0]:
- current_node = c
- node_idx = i
- break
-
- assert node_idx is not None
- assert current_node is not None
-
- new_devices = list(set(block_devices) - set([d['name'] for d in current_node['devices']]))
- for n in new_devices:
- patch.append({
- "op": "add",
- "path": "/spec/storage/nodes/{0}/devices/-".format(node_idx),
- "value": {'name': n}
- })
-
- new_dirs = list(set(directories) - set(current_node['directories']))
- for p in new_dirs:
- patch.append({
- "op": "add",
- "path": "/spec/storage/nodes/{0}/directories/-".format(node_idx),
- "value": {'path': p}
- })
+ new = create_func()
+ with self.ignore_409("{} {} already exists".format(crd_name,
+ cr_name)):
+ self.rook_api_post("{}/".format(crd_name),
+ body=new.to_json())
+ return "Created"
+ def get_ceph_image(self) -> str:
+ try:
+ api_response = self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
+ label_selector="app=rook-ceph-mon",
+ timeout_seconds=10)
+ if api_response.items:
+ return api_response.items[-1].spec.containers[0].image
+ else:
+ raise orchestrator.OrchestratorError(
+ "Error getting ceph image. Cluster without monitors")
+ except ApiException as e:
+ raise orchestrator.OrchestratorError("Error getting ceph image: {}".format(e))
+
+
+ def _execute_blight_job(self, ident_fault: str, on: bool, loc: orchestrator.DeviceLightLoc) -> str:
+ operation_id = str(hash(loc))
+ message = ""
+
+ # job definition
+ job_metadata = client.V1ObjectMeta(name=operation_id,
+ namespace= self.rook_env.namespace,
+ labels={"ident": operation_id})
+ pod_metadata = client.V1ObjectMeta(labels={"ident": operation_id})
+ pod_container = client.V1Container(name="ceph-lsmcli-command",
+ security_context=client.V1SecurityContext(privileged=True),
+ image=self.get_ceph_image(),
+ command=["lsmcli",],
+ args=['local-disk-%s-led-%s' % (ident_fault,'on' if on else 'off'),
+ '--path', loc.path or loc.dev,],
+ volume_mounts=[client.V1VolumeMount(name="devices", mount_path="/dev"),
+ client.V1VolumeMount(name="run-udev", mount_path="/run/udev")])
+ pod_spec = client.V1PodSpec(containers=[pod_container],
+ active_deadline_seconds=30, # Max time to terminate pod
+ restart_policy="Never",
+ node_selector= {"kubernetes.io/hostname": loc.host},
+ volumes=[client.V1Volume(name="devices",
+ host_path=client.V1HostPathVolumeSource(path="/dev")),
+ client.V1Volume(name="run-udev",
+ host_path=client.V1HostPathVolumeSource(path="/run/udev"))])
+ pod_template = client.V1PodTemplateSpec(metadata=pod_metadata,
+ spec=pod_spec)
+ job_spec = client.V1JobSpec(active_deadline_seconds=60, # Max time to terminate job
+ ttl_seconds_after_finished=10, # Alfa. Lifetime after finishing (either Complete or Failed)
+ backoff_limit=0,
+ template=pod_template)
+ job = client.V1Job(api_version="batch/v1",
+ kind="Job",
+ metadata=job_metadata,
+ spec=job_spec)
+
+ # delete previous job if it exists
+ try:
+ try:
+ api_response = self.batchV1_api.delete_namespaced_job(operation_id,
+ self.rook_env.namespace,
+ propagation_policy="Background")
+ except ApiException as e:
+ if e.status != 404: # No problem if the job does not exist
+ raise
+
+ # wait until the job is not present
+ deleted = False
+ retries = 0
+ while not deleted and retries < 10:
+ api_response = self.batchV1_api.list_namespaced_job(self.rook_env.namespace,
+ label_selector="ident=%s" % operation_id,
+ timeout_seconds=10)
+ deleted = not api_response.items
+ if retries > 5:
+ sleep(0.1)
+ retries += 1
+ if retries == 10 and not deleted:
+ raise orchestrator.OrchestratorError(
+ "Light <{}> in <{}:{}> cannot be executed. Cannot delete previous job <{}>".format(
+ on, loc.host, loc.path or loc.dev, operation_id))
+
+ # create the job
+ api_response = self.batchV1_api.create_namespaced_job(self.rook_env.namespace, job)
+
+ # get the result
+ finished = False
+ while not finished:
+ api_response = self.batchV1_api.read_namespaced_job(operation_id,
+ self.rook_env.namespace)
+ finished = api_response.status.succeeded or api_response.status.failed
+ if finished:
+ message = api_response.status.conditions[-1].message
+
+ # get the result of the lsmcli command
+ api_response=self.coreV1_api.list_namespaced_pod(self.rook_env.namespace,
+ label_selector="ident=%s" % operation_id,
+ timeout_seconds=10)
+ if api_response.items:
+ pod_name = api_response.items[-1].metadata.name
+ message = self.coreV1_api.read_namespaced_pod_log(pod_name,
+ self.rook_env.namespace)
- if len(patch) == 0:
- return "No change"
+ except ApiException as e:
+ log.exception('K8s API failed. {}'.format(e))
+ raise
+ # Finally, delete the job.
+ # The job uses <ttl_seconds_after_finished>. This makes that the TTL controller delete automatically the job.
+ # This feature is in Alpha state, so extra explicit delete operations trying to delete the Job has been used strategically
try:
- self.rook_api_patch(
- "cephclusters/{0}".format(self.rook_env.cluster_name),
- body=patch)
+ api_response = self.batchV1_api.delete_namespaced_job(operation_id,
+ self.rook_env.namespace,
+ propagation_policy="Background")
except ApiException as e:
- log.exception("API exception: {0}".format(e))
- raise ApplyException(
- "Failed to create OSD entries in Cluster CRD: {0}".format(
- e))
+ if e.status != 404: # No problem if the job does not exist
+ raise
- return "Success"
+ return message
+
+ def blink_light(self, ident_fault, on, locs):
+ # type: (str, bool, List[orchestrator.DeviceLightLoc]) -> List[str]
+ return [self._execute_blight_job(ident_fault, on, loc) for loc in locs]
+
+def placement_spec_to_node_selector(spec: PlacementSpec, all_hosts: List) -> ccl.NodeSelectorTermsItem:
+ all_hostnames = [hs.hostname for hs in all_hosts]
+ res = ccl.NodeSelectorTermsItem(matchExpressions=ccl.MatchExpressionsList())
+ if spec.host_pattern and spec.host_pattern != "*":
+ raise RuntimeError("The Rook orchestrator only supports a host_pattern of * for placements")
+ if spec.label:
+ res.matchExpressions.append(
+ ccl.MatchExpressionsItem(
+ key="ceph-label/" + spec.label,
+ operator="Exists"
+ )
+ )
+ if spec.hosts:
+ host_list = [h.hostname for h in spec.hosts if h.hostname in all_hostnames]
+ res.matchExpressions.append(
+ ccl.MatchExpressionsItem(
+ key="kubernetes.io/hostname",
+ operator="In",
+ values=ccl.CrdObjectList(host_list)
+ )
+ )
+ if spec.host_pattern == "*" or (not spec.label and not spec.hosts and not spec.host_pattern):
+ res.matchExpressions.append(
+ ccl.MatchExpressionsItem(
+ key="kubernetes.io/hostname",
+ operator="Exists",
+ )
+ )
+ return res
+
+def node_selector_to_placement_spec(node_selector: ccl.NodeSelectorTermsItem) -> PlacementSpec:
+ res = PlacementSpec()
+ for expression in node_selector.matchExpressions:
+ if expression.key.startswith("ceph-label/"):
+ res.label = expression.key.split('/')[1]
+ elif expression.key == "kubernetes.io/hostname":
+ if expression.operator == "Exists":
+ res.host_pattern = "*"
+ elif expression.operator == "In":
+ res.hosts = [HostPlacementSpec(hostname=value, network='', name='')for value in expression.values]
+ return res