class DefaultFetcher():
- def __init__(self, storage_class: str, coreV1_api: 'client.CoreV1Api'):
- self.storage_class = storage_class
+ def __init__(self, storage_class_name: str, coreV1_api: 'client.CoreV1Api', rook_env: 'RookEnv'):
+ self.storage_class_name = storage_class_name
self.coreV1_api = coreV1_api
+ self.rook_env = rook_env
+ self.pvs_in_sc: List[client.V1PersistentVolumeList] = []
def fetch(self) -> None:
self.inventory: KubernetesResource[client.V1PersistentVolumeList] = KubernetesResource(self.coreV1_api.list_persistent_volume)
- self.pvs_in_sc = [i for i in self.inventory.items if i.spec.storage_class_name == self.storage_class]
+ self.pvs_in_sc = [i for i in self.inventory.items if i.spec.storage_class_name == self.storage_class_name]
def convert_size(self, size_str: str) -> int:
units = ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "", "K", "M", "G", "T", "P", "E")
available = state,
)
return (node, device)
-
+
class LSOFetcher(DefaultFetcher):
- def __init__(self, storage_class: 'str', coreV1_api: 'client.CoreV1Api', customObjects_api: 'client.CustomObjectsApi', nodenames: 'Optional[List[str]]' = None):
- super().__init__(storage_class, coreV1_api)
+ def __init__(self, storage_class: 'str', coreV1_api: 'client.CoreV1Api', rook_env: 'RookEnv', customObjects_api: 'client.CustomObjectsApi', nodenames: 'Optional[List[str]]' = None):
+ super().__init__(storage_class, coreV1_api, rook_env)
self.customObjects_api = customObjects_api
self.nodenames = nodenames
class PDFetcher(DefaultFetcher):
""" Physical Devices Fetcher"""
- def __init__(self, coreV1_api: 'client.CoreV1Api'):
- self.coreV1_api = coreV1_api
+ def __init__(self, coreV1_api: 'client.CoreV1Api', rook_env: 'RookEnv'):
+ super().__init__('', coreV1_api, rook_env)
def fetch(self) -> None:
""" Collect the devices information from k8s configmaps"""
self.dev_cms: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
- namespace='rook-ceph',
+ namespace=self.rook_env.operator_namespace,
label_selector='app=rook-discover')
def devices(self) -> Dict[str, List[Device]]:
self.api_func))
class DefaultCreator():
- def __init__(self, inventory: 'Dict[str, List[Device]]', coreV1_api: 'client.CoreV1Api', storage_class: 'str'):
+ def __init__(self, inventory: 'Dict[str, List[Device]]', coreV1_api: 'client.CoreV1Api', storage_class_name: 'str'):
self.coreV1_api = coreV1_api
- self.storage_class = storage_class
+ self.storage_class_name = storage_class_name
self.inventory = inventory
def device_to_device_set(self, drive_group: DriveGroupSpec, d: Device) -> ccl.StorageClassDeviceSetsItem:
name="data"
),
spec=ccl.Spec(
- storageClassName=self.storage_class,
+ storageClassName=self.storage_class_name,
volumeMode="Block",
accessModes=ccl.CrdObjectList(["ReadWriteOnce"]),
resources={
storageV1_api: 'client.StorageV1Api',
appsV1_api: 'client.AppsV1Api',
rook_env: 'RookEnv',
- storage_class: 'str'
+ storage_class_name: 'str'
):
self.rook_env = rook_env # type: RookEnv
self.coreV1_api = coreV1_api # client.CoreV1Api
self.customObjects_api = customObjects_api
self.storageV1_api = storageV1_api # client.StorageV1Api
self.appsV1_api = appsV1_api # client.AppsV1Api
- self.storage_class = storage_class # type: str
+ self.storage_class_name = storage_class_name # type: str
# TODO: replace direct k8s calls with Rook API calls
- self.storage_classes : KubernetesResource = KubernetesResource(self.storageV1_api.list_storage_class)
+ self.available_storage_classes : KubernetesResource = KubernetesResource(self.storageV1_api.list_storage_class)
+ self.configured_storage_classes = self.list_storage_classes()
self.rook_pods: KubernetesResource[client.V1Pod] = KubernetesResource(self.coreV1_api.list_namespaced_pod,
namespace=self.rook_env.namespace,
def rook_api_post(self, path: str, **kwargs: Any) -> Any:
return self.rook_api_call("POST", path, **kwargs)
+ def list_storage_classes(self) -> List[str]:
+ try:
+ crd = self.customObjects_api.get_namespaced_custom_object(
+ group="ceph.rook.io",
+ version="v1",
+ namespace=self.rook_env.namespace,
+ plural="cephclusters",
+ name=self.rook_env.cluster_name)
+
+ sc_devicesets = crd['spec']['storage']['storageClassDeviceSets']
+ sc_names = [vct['spec']['storageClassName'] for sc in sc_devicesets for vct in sc['volumeClaimTemplates']]
+ log.info(f"the cluster has the following configured sc: {sc_names}")
+ return sc_names
+ except Exception as e:
+ log.error(f"unable to list storage classes: {e}")
+ return []
+
+ # TODO: remove all the calls to code that uses rook_cluster.storage_class_name
def get_storage_class(self) -> 'client.V1StorageClass':
- matching_sc = [i for i in self.storage_classes.items if self.storage_class == i.metadata.name]
+ matching_sc = [i for i in self.available_storage_classes.items if self.storage_class_name == i.metadata.name]
if len(matching_sc) == 0:
- log.error(f"No storage class exists matching configured Rook orchestrator storage class which currently is <{self.storage_class}>. This storage class can be set in ceph config (mgr/rook/storage_class)")
+ log.error(f"No storage class exists matching configured Rook orchestrator storage class which currently is <{self.storage_class_name}>. This storage class can be set in ceph config (mgr/rook/storage_class)")
raise Exception('No storage class exists matching name provided in ceph config at mgr/rook/storage_class')
return matching_sc[0]
def get_discovered_devices(self, nodenames: Optional[List[str]] = None) -> Dict[str, List[Device]]:
- self.fetcher: Optional[DefaultFetcher] = None
- op_settings = self.coreV1_api.read_namespaced_config_map(name="rook-ceph-operator-config", namespace='rook-ceph').data
+ discovered_devices: Dict[str, List[Device]] = {}
+ op_settings = self.coreV1_api.read_namespaced_config_map(name="rook-ceph-operator-config", namespace=self.rook_env.operator_namespace).data
+ fetcher: Optional[DefaultFetcher] = None
if op_settings.get('ROOK_ENABLE_DISCOVERY_DAEMON', 'false').lower() == 'true':
- self.fetcher = PDFetcher(self.coreV1_api)
+ fetcher = PDFetcher(self.coreV1_api, self.rook_env)
+ fetcher.fetch()
+ discovered_devices = fetcher.devices()
else:
- storage_class = self.get_storage_class()
- if storage_class.metadata.labels and ('local.storage.openshift.io/owner-name' in storage_class.metadata.labels):
- self.fetcher = LSOFetcher(self.storage_class, self.coreV1_api, self.customObjects_api, nodenames)
- else:
- self.fetcher = DefaultFetcher(self.storage_class, self.coreV1_api)
+ active_storage_classes = [sc for sc in self.available_storage_classes.items if sc.metadata.name in self.configured_storage_classes]
+ for sc in active_storage_classes:
+ if sc.metadata.labels and ('local.storage.openshift.io/owner-name' in sc.metadata.labels):
+ fetcher = LSOFetcher(sc.metadata.name, self.coreV1_api, self.customObjects_api, nodenames)
+ else:
+ fetcher = DefaultFetcher(sc.metadata.name, self.coreV1_api, self.rook_env)
+ fetcher.fetch()
+ discovered_devices.update(fetcher.devices())
- self.fetcher.fetch()
- return self.fetcher.devices()
+ return discovered_devices
def get_osds(self) -> List:
osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod,
namespace=self.rook_env.namespace,
label_selector='app=rook-ceph-osd')
return list(osd_pods.items)
-
+
def get_nfs_conf_url(self, nfs_cluster: str, instance: str) -> Optional[str]:
#
# Fetch cephnfs object for "nfs_cluster" and then return a rados://
storage_class.metadata.labels
and 'local.storage.openshift.io/owner-name' in storage_class.metadata.labels
):
- creator = LSOCreator(inventory, self.coreV1_api, self.storage_class)
+ creator = LSOCreator(inventory, self.coreV1_api, self.storage_class_name)
else:
- creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class)
+ creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class_name)
return self._patch(
ccl.CephCluster,
'cephclusters',