import jsonpatch
from urllib.parse import urljoin
+import json
# Optional kubernetes imports to enable MgrModule.can_run
# to behave cleanly.
)
return (node, device)
+
+class PDFetcher(DefaultFetcher):
+ """ Physical Devices Fetcher"""
+ def __init__(self, coreV1_api: 'client.CoreV1Api'):
+ self.coreV1_api = coreV1_api
+
+ def fetch(self) -> None:
+ """ Collect the devices information from k8s configmaps"""
+ self.dev_cms: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_config_map,
+ namespace='rook-ceph',
+ label_selector='app=rook-discover')
+
+ def devices(self) -> Dict[str, List[Device]]:
+ """ Return the list of devices found"""
+ node_devices: Dict[str, List[Device]] = {}
+ for i in self.dev_cms.items:
+ devices_list: List[Device] = []
+ for d in json.loads(i.data['devices']):
+ devices_list.append(self.device(d)[1])
+ node_devices[i.metadata.labels['rook.io/node']] = devices_list
+
+ return node_devices
+
+ def device(self, devData: Dict[str,str]) -> Tuple[str, Device]:
+ """ Build an orchestrator device """
+ if 'cephVolumeData' in devData and devData['cephVolumeData']:
+ return "", Device.from_json(json.loads(devData['cephVolumeData']))
+ else:
+ return "", Device(
+ path='/dev/' + devData['name'],
+ sys_api=dict(
+ rotational='1' if devData['rotational'] else '0',
+ size=devData['size']
+ ),
+ available=False,
+ rejected_reasons=['device data coming from ceph-volume not provided'],
+ )
+
+
class KubernetesResource(Generic[T]):
def __init__(self, api_func: Callable, **kwargs: Any) -> None:
"""
label_selector="rook_cluster={0}".format(
self.rook_env.namespace))
self.nodes: KubernetesResource[client.V1Node] = KubernetesResource(self.coreV1_api.list_node)
-
+
def rook_url(self, path: str) -> str:
prefix = "/apis/ceph.rook.io/%s/namespaces/%s/" % (
self.rook_env.crd_version, self.rook_env.namespace)
return matching_sc[0]
def get_discovered_devices(self, nodenames: Optional[List[str]] = None) -> Dict[str, List[Device]]:
- storage_class = self.get_storage_class()
self.fetcher: Optional[DefaultFetcher] = None
- if storage_class.metadata.labels and ('local.storage.openshift.io/owner-name' in storage_class.metadata.labels):
- self.fetcher = LSOFetcher(self.storage_class, self.coreV1_api, self.customObjects_api, nodenames)
+ op_settings = self.coreV1_api.read_namespaced_config_map(name="rook-ceph-operator-config", namespace='rook-ceph').data
+ if op_settings.get('ROOK_ENABLE_DISCOVERY_DAEMON', 'false').lower() == 'true':
+ self.fetcher = PDFetcher(self.coreV1_api)
else:
- self.fetcher = DefaultFetcher(self.storage_class, self.coreV1_api)
+ storage_class = self.get_storage_class()
+ if storage_class.metadata.labels and ('local.storage.openshift.io/owner-name' in storage_class.metadata.labels):
+ self.fetcher = LSOFetcher(self.storage_class, self.coreV1_api, self.customObjects_api, nodenames)
+ else:
+ self.fetcher = DefaultFetcher(self.storage_class, self.coreV1_api)
+
self.fetcher.fetch()
return self.fetcher.devices()
-
+
def get_osds(self) -> List:
osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace='rook-ceph', label_selector='app=rook-ceph-osd')
return list(osd_pods.items)
return rook_nfsgw
create_ganesha_pool(mgr)
- NFSRados(mgr_module, service_id).write_obj('', f'conf-nfs.{spec.service_id}')
+ NFSRados(mgr_module.rados, service_id).write_obj('', f'conf-nfs.{spec.service_id}')
return self._create_or_patch(cnfs.CephNFS, 'cephnfses', service_id,
_update_nfs, _create_nfs)