+def threaded(f: Callable[..., None]) -> Callable[..., threading.Thread]:
+ def wrapper(*args: Any, **kwargs: Any) -> threading.Thread:
+ t = threading.Thread(target=f, args=args, kwargs=kwargs)
+ t.start()
+ return t
+
+ return cast(Callable[..., threading.Thread], wrapper)
+
+
+class DefaultFetcher():
+ def __init__(self, storage_class: str, coreV1_api: 'client.CoreV1Api'):
+ self.storage_class = storage_class
+ self.coreV1_api = coreV1_api
+
+ def fetch(self) -> None:
+ self.inventory: KubernetesResource[client.V1PersistentVolumeList] = KubernetesResource(self.coreV1_api.list_persistent_volume)
+ self.pvs_in_sc = [i for i in self.inventory.items if i.spec.storage_class_name == self.storage_class]
+
+ def convert_size(self, size_str: str) -> int:
+ units = ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "", "K", "M", "G", "T", "P", "E")
+ coeff_and_unit = re.search('(\d+)(\D+)', size_str)
+ assert coeff_and_unit is not None
+ coeff = int(coeff_and_unit[1])
+ unit = coeff_and_unit[2]
+ try:
+ factor = units.index(unit) % 7
+ except ValueError:
+ log.error("PV size format invalid")
+ raise
+ size = coeff * (2 ** (10 * factor))
+ return size
+
+ def devices(self) -> Dict[str, List[Device]]:
+ nodename_to_devices: Dict[str, List[Device]] = {}
+ for i in self.pvs_in_sc:
+ node, device = self.device(i)
+ if node not in nodename_to_devices:
+ nodename_to_devices[node] = []
+ nodename_to_devices[node].append(device)
+ return nodename_to_devices
+
+ def device(self, i: 'client.V1PersistentVolume') -> Tuple[str, Device]:
+ node = 'N/A'
+ if i.spec.node_affinity:
+ terms = i.spec.node_affinity.required.node_selector_terms
+ if len(terms) == 1 and len(terms[0].match_expressions) == 1 and terms[0].match_expressions[0].key == 'kubernetes.io/hostname' and len(terms[0].match_expressions[0].values) == 1:
+ node = terms[0].match_expressions[0].values[0]
+ size = self.convert_size(i.spec.capacity['storage'])
+ path = i.spec.host_path.path if i.spec.host_path else i.spec.local.path if i.spec.local else ('/dev/' + i.metadata.annotations['storage.openshift.com/device-name']) if i.metadata.annotations and 'storage.openshift.com/device-name' in i.metadata.annotations else ''
+ state = i.spec.volume_mode == 'Block' and i.status.phase == 'Available'
+ pv_name = i.metadata.name
+ device = Device(
+ path = path,
+ sys_api = dict(
+ size = size,
+ node = node,
+ pv_name = pv_name
+ ),
+ available = state,
+ )
+ return (node, device)
+
+
+class LSOFetcher(DefaultFetcher):
+ def __init__(self, storage_class: 'str', coreV1_api: 'client.CoreV1Api', customObjects_api: 'client.CustomObjectsApi', nodenames: 'Optional[List[str]]' = None):
+ super().__init__(storage_class, coreV1_api)
+ self.customObjects_api = customObjects_api
+ self.nodenames = nodenames
+
+ def fetch(self) -> None:
+ super().fetch()
+ self.discovery: KubernetesCustomResource = KubernetesCustomResource(self.customObjects_api.list_cluster_custom_object,
+ group="local.storage.openshift.io",
+ version="v1alpha1",
+ plural="localvolumediscoveryresults")
+
+ def predicate(self, item: 'client.V1ConfigMapList') -> bool:
+ if self.nodenames is not None:
+ return item['spec']['nodeName'] in self.nodenames
+ else:
+ return True
+
+ def devices(self) -> Dict[str, List[Device]]:
+ try:
+ lso_discovery_results = [i for i in self.discovery.items if self.predicate(i)]
+ except ApiException as dummy_e:
+ log.error("Failed to fetch device metadata")
+ raise
+ self.lso_devices = {}
+ for i in lso_discovery_results:
+ drives = i['status']['discoveredDevices']
+ for drive in drives:
+ self.lso_devices[drive['deviceID'].split('/')[-1]] = drive
+ nodename_to_devices: Dict[str, List[Device]] = {}
+ for i in self.pvs_in_sc:
+ node, device = (None, None)
+ if (not i.metadata.annotations) or ('storage.openshift.com/device-id' not in i.metadata.annotations) or (i.metadata.annotations['storage.openshift.com/device-id'] not in self.lso_devices):
+ node, device = super().device(i)
+ else:
+ node, device = self.device(i)
+ if node not in nodename_to_devices:
+ nodename_to_devices[node] = []
+ nodename_to_devices[node].append(device)
+ return nodename_to_devices
+
+ def device(self, i: Any) -> Tuple[str, Device]:
+ node = i.metadata.labels['kubernetes.io/hostname']
+ device_discovery = self.lso_devices[i.metadata.annotations['storage.openshift.com/device-id']]
+ pv_name = i.metadata.name
+ vendor: str = device_discovery['model'].split()[0] if len(device_discovery['model'].split()) >= 1 else ''
+ model: str = ' '.join(device_discovery['model'].split()[1:]) if len(device_discovery['model'].split()) > 1 else ''
+ device = Device(
+ path = device_discovery['path'],
+ sys_api = dict(
+ size = device_discovery['size'],
+ rotational = '1' if device_discovery['property']=='Rotational' else '0',
+ node = node,
+ pv_name = pv_name,
+ model = model,
+ vendor = vendor
+ ),
+ available = device_discovery['status']['state']=='Available',
+ device_id = device_discovery['deviceID'].split('/')[-1],
+ lsm_data = dict(
+ serialNum = device_discovery['serial']
+ )
+ )
+ return (node, device)
+
+
+class PDFetcher(DefaultFetcher):
+ """ Physical Devices Fetcher"""
+ def __init__(self, coreV1_api: 'client.CoreV1Api'):
+ self.coreV1_api = coreV1_api
+
+ def fetch(self) -> None:
+ """ Collect the devices information from k8s configmaps"""
+ self.dev_cms: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
+ namespace='rook-ceph',
+ label_selector='app=rook-discover')
+
+ def devices(self) -> Dict[str, List[Device]]:
+ """ Return the list of devices found"""
+ node_devices: Dict[str, List[Device]] = {}
+ for i in self.dev_cms.items:
+ devices_list: List[Device] = []
+ for d in json.loads(i.data['devices']):
+ devices_list.append(self.device(d)[1])
+ node_devices[i.metadata.labels['rook.io/node']] = devices_list
+
+ return node_devices
+
+ def device(self, devData: Dict[str,str]) -> Tuple[str, Device]:
+ """ Build an orchestrator device """
+ if 'cephVolumeData' in devData and devData['cephVolumeData']:
+ return "", Device.from_json(json.loads(devData['cephVolumeData']))
+ else:
+ return "", Device(
+ path='/dev/' + devData['name'],
+ sys_api=dict(
+ rotational='1' if devData['rotational'] else '0',
+ size=devData['size']
+ ),
+ available=False,
+ rejected_reasons=['device data coming from ceph-volume not provided'],
+ )
+
+
+class KubernetesResource(Generic[T]):
+ def __init__(self, api_func: Callable, **kwargs: Any) -> None:
+ """
+ Generic kubernetes Resource parent class
+
+ The api fetch and watch methods should be common across resource types,
+
+ Exceptions in the runner thread are propagated to the caller.
+
+ :param api_func: kubernetes client api function that is passed to the watcher
+ :param filter_func: signature: ``(Item) -> bool``.
+ """
+ self.kwargs = kwargs
+ self.api_func = api_func
+
+ # ``_items`` is accessed by different threads. I assume assignment is atomic.
+ self._items: Dict[str, T] = dict()
+ self.thread = None # type: Optional[threading.Thread]
+ self.exception: Optional[Exception] = None
+ if not _urllib3_supports_read_chunked:
+ logging.info('urllib3 is too old. Fallback to full fetches')
+
+ def _fetch(self) -> str:
+ """ Execute the requested api method as a one-off fetch"""
+ response = self.api_func(**self.kwargs)
+ metadata = response.metadata
+ self._items = {item.metadata.name: item for item in response.items}
+ log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
+ return metadata.resource_version
+
+ @property
+ def items(self) -> Iterable[T]:
+ """
+ Returns the items of the request.
+ Creates the watcher as a side effect.
+ :return:
+ """
+ if self.exception:
+ e = self.exception
+ self.exception = None
+ raise e # Propagate the exception to the user.
+ if not self.thread or not self.thread.is_alive():
+ resource_version = self._fetch()
+ if _urllib3_supports_read_chunked:
+ # Start a thread which will use the kubernetes watch client against a resource
+ log.debug("Attaching resource watcher for k8s {}".format(self.api_func))
+ self.thread = self._watch(resource_version)
+
+ return self._items.values()
+
+ def get_item_name(self, item: Any) -> Any:
+ try:
+ return item.metadata.name
+ except AttributeError:
+ raise AttributeError(
+ "{} doesn't contain a metadata.name. Unable to track changes".format(
+ self.api_func))
+ @threaded
+ def _watch(self, res_ver: Optional[str]) -> None:
+ """ worker thread that runs the kubernetes watch """
+
+ self.exception = None
+
+ w = watch.Watch()
+
+ try:
+ # execute generator to continually watch resource for changes
+ for event in w.stream(self.api_func, resource_version=res_ver, watch=True,
+ **self.kwargs):
+ self.health = ''
+ item = event['object']
+ name = self.get_item_name(item)
+
+ log.info('{} event: {}'.format(event['type'], name))
+
+ if event['type'] in ('ADDED', 'MODIFIED'):
+ self._items = merge_dicts(self._items, {name: item})
+ elif event['type'] == 'DELETED':
+ self._items = {k:v for k,v in self._items.items() if k != name}
+ elif event['type'] == 'BOOKMARK':
+ pass
+ elif event['type'] == 'ERROR':
+ raise ApiException(str(event))
+ else:
+ raise KeyError('Unknown watch event {}'.format(event['type']))
+ except ProtocolError as e:
+ if 'Connection broken' in str(e):
+ log.info('Connection reset.')
+ return
+ raise
+ except ApiException as e:
+ log.exception('K8s API failed. {}'.format(self.api_func))
+ self.exception = e
+ raise
+ except Exception as e:
+ log.exception("Watcher failed. ({})".format(self.api_func))
+ self.exception = e
+ raise
+
+class KubernetesCustomResource(KubernetesResource):
+ def _fetch(self) -> str:
+ response = self.api_func(**self.kwargs)
+ metadata = response['metadata']
+ self._items = {item['metadata']['name']: item for item in response['items']}
+ log.info('Full fetch of {}. result: {}'.format(self.api_func, len(self._items)))
+ return metadata['resourceVersion']
+
+ def get_item_name(self, item: Any) -> Any:
+ try:
+ return item['metadata']['name']
+ except AttributeError:
+ raise AttributeError(
+ "{} doesn't contain a metadata.name. Unable to track changes".format(
+ self.api_func))
+
+class DefaultCreator():
+ def __init__(self, inventory: 'Dict[str, List[Device]]', coreV1_api: 'client.CoreV1Api', storage_class: 'str'):
+ self.coreV1_api = coreV1_api
+ self.storage_class = storage_class
+ self.inventory = inventory
+
+ def device_to_device_set(self, drive_group: DriveGroupSpec, d: Device) -> ccl.StorageClassDeviceSetsItem:
+ device_set = ccl.StorageClassDeviceSetsItem(
+ name=d.sys_api['pv_name'],
+ volumeClaimTemplates= ccl.VolumeClaimTemplatesList(),
+ count=1,
+ encrypted=drive_group.encrypted,
+ portable=False
+ )
+ device_set.volumeClaimTemplates.append(
+ ccl.VolumeClaimTemplatesItem(
+ metadata=ccl.Metadata(
+ name="data"
+ ),
+ spec=ccl.Spec(
+ storageClassName=self.storage_class,
+ volumeMode="Block",
+ accessModes=ccl.CrdObjectList(["ReadWriteOnce"]),
+ resources={
+ "requests":{
+ "storage": 1
+ }
+ },
+ volumeName=d.sys_api['pv_name']
+ )
+ )
+ )
+ return device_set
+
+ def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
+ device_list = []
+ assert drive_group.data_devices is not None
+ sizematcher: Optional[SizeMatcher] = None
+ if drive_group.data_devices.size:
+ sizematcher = SizeMatcher('size', drive_group.data_devices.size)
+ limit = getattr(drive_group.data_devices, 'limit', None)
+ count = 0
+ all = getattr(drive_group.data_devices, 'all', None)
+ paths = [device.path for device in drive_group.data_devices.paths]
+ osd_list = []
+ for pod in rook_pods.items:
+ if (
+ hasattr(pod, 'metadata')
+ and hasattr(pod.metadata, 'labels')
+ and 'osd' in pod.metadata.labels
+ and 'ceph.rook.io/DeviceSet' in pod.metadata.labels
+ ):
+ osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
+ for _, node in self.inventory.items():
+ for device in node:
+ if device.sys_api['pv_name'] in osd_list:
+ count += 1
+ for _, node in self.inventory.items():
+ for device in node:
+ if not limit or (count < limit):
+ if device.available:
+ if (
+ all
+ or (
+ device.sys_api['node'] in matching_hosts
+ and ((sizematcher != None) or sizematcher.compare(device))
+ and (
+ not drive_group.data_devices.paths
+ or (device.path in paths)
+ )
+ )
+ ):
+ device_list.append(device)
+ count += 1
+
+ return device_list
+
+ def add_osds(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> Any:
+ to_create = self.filter_devices(rook_pods, drive_group,matching_hosts)
+ assert drive_group.data_devices is not None
+ def _add_osds(current_cluster, new_cluster):
+ # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
+ if not hasattr(new_cluster.spec, 'storage') or not new_cluster.spec.storage:
+ new_cluster.spec.storage = ccl.Storage()
+
+ if not hasattr(new_cluster.spec.storage, 'storageClassDeviceSets') or not new_cluster.spec.storage.storageClassDeviceSets:
+ new_cluster.spec.storage.storageClassDeviceSets = ccl.StorageClassDeviceSetsList()
+
+ existing_scds = [
+ scds.name for scds in new_cluster.spec.storage.storageClassDeviceSets
+ ]
+ for device in to_create:
+ new_scds = self.device_to_device_set(drive_group, device)
+ if new_scds.name not in existing_scds:
+ new_cluster.spec.storage.storageClassDeviceSets.append(new_scds)
+ return new_cluster
+ return _add_osds
+
+class LSOCreator(DefaultCreator):
+ def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
+ device_list = []
+ assert drive_group.data_devices is not None
+ sizematcher = None
+ if drive_group.data_devices.size:
+ sizematcher = SizeMatcher('size', drive_group.data_devices.size)
+ limit = getattr(drive_group.data_devices, 'limit', None)
+ all = getattr(drive_group.data_devices, 'all', None)
+ paths = [device.path for device in drive_group.data_devices.paths]
+ vendor = getattr(drive_group.data_devices, 'vendor', None)
+ model = getattr(drive_group.data_devices, 'model', None)
+ count = 0
+ osd_list = []
+ for pod in rook_pods.items:
+ if (
+ hasattr(pod, 'metadata')
+ and hasattr(pod.metadata, 'labels')
+ and 'osd' in pod.metadata.labels
+ and 'ceph.rook.io/DeviceSet' in pod.metadata.labels
+ ):
+ osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
+ for _, node in self.inventory.items():
+ for device in node:
+ if device.sys_api['pv_name'] in osd_list:
+ count += 1
+ for _, node in self.inventory.items():
+ for device in node:
+ if not limit or (count < limit):
+ if device.available:
+ if (
+ all
+ or (
+ device.sys_api['node'] in matching_hosts
+ and ((sizematcher != None) or sizematcher.compare(device))
+ and (
+ not drive_group.data_devices.paths
+ or device.path in paths
+ )
+ and (
+ not vendor
+ or device.sys_api['vendor'] == vendor
+ )
+ and (
+ not model
+ or device.sys_api['model'].startsWith(model)
+ )
+ )
+ ):
+ device_list.append(device)
+ count += 1
+ return device_list
+
+class DefaultRemover():
+ def __init__(
+ self,
+ coreV1_api: 'client.CoreV1Api',
+ batchV1_api: 'client.BatchV1Api',
+ appsV1_api: 'client.AppsV1Api',
+ osd_ids: List[str],
+ replace_flag: bool,
+ force_flag: bool,
+ mon_command: Callable,
+ patch: Callable,
+ rook_env: 'RookEnv',
+ inventory: Dict[str, List[Device]]
+ ):
+ self.batchV1_api = batchV1_api
+ self.appsV1_api = appsV1_api
+ self.coreV1_api = coreV1_api
+
+ self.osd_ids = osd_ids
+ self.replace_flag = replace_flag
+ self.force_flag = force_flag
+
+ self.mon_command = mon_command
+
+ self.patch = patch
+ self.rook_env = rook_env
+
+ self.inventory = inventory
+ self.osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace='rook-ceph', label_selector='app=rook-ceph-osd')
+ self.jobs: KubernetesResource = KubernetesResource(self.batchV1_api.list_namespaced_job, namespace='rook-ceph', label_selector='app=rook-ceph-osd-prepare')
+ self.pvcs: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_persistent_volume_claim, namespace='rook-ceph')
+
+
+ def remove_device_sets(self) -> str:
+ self.to_remove: Dict[str, int] = {}
+ self.pvc_to_remove: List[str] = []
+ for pod in self.osd_pods.items:
+ if (
+ hasattr(pod, 'metadata')
+ and hasattr(pod.metadata, 'labels')
+ and 'osd' in pod.metadata.labels
+ and pod.metadata.labels['osd'] in self.osd_ids
+ ):
+ if pod.metadata.labels['ceph.rook.io/DeviceSet'] in self.to_remove:
+ self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] + 1
+ else:
+ self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = 1
+ self.pvc_to_remove.append(pod.metadata.labels['ceph.rook.io/pvc'])
+ def _remove_osds(current_cluster, new_cluster):
+ # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
+ assert new_cluster.spec.storage is not None and new_cluster.spec.storage.storageClassDeviceSets is not None
+ for _set in new_cluster.spec.storage.storageClassDeviceSets:
+ if _set.name in self.to_remove:
+ if _set.count == self.to_remove[_set.name]:
+ new_cluster.spec.storage.storageClassDeviceSets.remove(_set)
+ else:
+ _set.count = _set.count - self.to_remove[_set.name]
+ return new_cluster
+ return self.patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _remove_osds)
+
+ def check_force(self) -> None:
+ if not self.force_flag:
+ safe_args = {'prefix': 'osd safe-to-destroy',
+ 'ids': [str(x) for x in self.osd_ids]}
+ ret, out, err = self.mon_command(safe_args)
+ if ret != 0:
+ raise RuntimeError(err)
+
+ def set_osds_down(self) -> None:
+ down_flag_args = {
+ 'prefix': 'osd down',
+ 'ids': [str(x) for x in self.osd_ids]
+ }
+ ret, out, err = self.mon_command(down_flag_args)
+ if ret != 0:
+ raise RuntimeError(err)
+
+ def scale_deployments(self) -> None:
+ for osd_id in self.osd_ids:
+ self.appsV1_api.patch_namespaced_deployment_scale(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), body=client.V1Scale(
+ spec=client.V1ScaleSpec(
+ replicas=0
+ )
+ ))
+
+ def set_osds_out(self) -> None:
+ out_flag_args = {
+ 'prefix': 'osd out',
+ 'ids': [str(x) for x in self.osd_ids]
+ }
+ ret, out, err = self.mon_command(out_flag_args)
+ if ret != 0:
+ raise RuntimeError(err)
+
+ def delete_deployments(self) -> None:
+ for osd_id in self.osd_ids:
+ self.appsV1_api.delete_namespaced_deployment(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), propagation_policy='Foreground')
+
+ def clean_up_prepare_jobs_and_pvc(self) -> None:
+ for job in self.jobs.items:
+ if job.metadata.labels['ceph.rook.io/pvc'] in self.pvc_to_remove:
+ self.batchV1_api.delete_namespaced_job(name=job.metadata.name, namespace='rook-ceph', propagation_policy='Foreground')
+ self.coreV1_api.delete_namespaced_persistent_volume_claim(name=job.metadata.labels['ceph.rook.io/pvc'], namespace='rook-ceph', propagation_policy='Foreground')
+
+ def purge_osds(self) -> None:
+ for id in self.osd_ids:
+ purge_args = {
+ 'prefix': 'osd purge-actual',
+ 'id': int(id),
+ 'yes_i_really_mean_it': True
+ }
+ ret, out, err = self.mon_command(purge_args)
+ if ret != 0:
+ raise RuntimeError(err)
+
+ def destroy_osds(self) -> None:
+ for id in self.osd_ids:
+ destroy_args = {
+ 'prefix': 'osd destroy-actual',
+ 'id': int(id),
+ 'yes_i_really_mean_it': True
+ }
+ ret, out, err = self.mon_command(destroy_args)
+ if ret != 0:
+ raise RuntimeError(err)
+
+ def remove(self) -> str:
+ try:
+ self.check_force()
+ except Exception as e:
+ log.exception("Error checking if OSDs are safe to destroy")
+ return f"OSDs not safe to destroy or unable to check if they are safe to destroy: {e}"
+ try:
+ remove_result = self.remove_device_sets()
+ except Exception as e:
+ log.exception("Error patching ceph cluster CRD")
+ return f"Not possible to modify Ceph cluster CRD: {e}"
+ try:
+ self.scale_deployments()
+ self.delete_deployments()
+ self.clean_up_prepare_jobs_and_pvc()
+ except Exception as e:
+ log.exception("Ceph cluster CRD patched, but error cleaning environment")
+ return f"Error cleaning environment after removing OSDs from Ceph cluster CRD: {e}"
+ try:
+ self.set_osds_down()
+ self.set_osds_out()
+ if self.replace_flag:
+ self.destroy_osds()
+ else:
+ self.purge_osds()
+ except Exception as e:
+ log.exception("OSDs removed from environment, but not able to remove OSDs from Ceph cluster")
+ return f"Error removing OSDs from Ceph cluster: {e}"
+
+ return remove_result
+
+
+