self.rook_env = rook_env
self.inventory = inventory
- self.osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace='rook-ceph', label_selector='app=rook-ceph-osd')
- self.jobs: KubernetesResource = KubernetesResource(self.batchV1_api.list_namespaced_job, namespace='rook-ceph', label_selector='app=rook-ceph-osd-prepare')
- self.pvcs: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_persistent_volume_claim, namespace='rook-ceph')
+ self.osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod,
+ namespace=self.rook_env.namespace,
+ label_selector='app=rook-ceph-osd')
+ self.jobs: KubernetesResource = KubernetesResource(self.batchV1_api.list_namespaced_job,
+ namespace=self.rook_env.namespace,
+ label_selector='app=rook-ceph-osd-prepare')
+ self.pvcs: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_persistent_volume_claim,
+ namespace=self.rook_env.namespace)
def remove_device_sets(self) -> str:
def scale_deployments(self) -> None:
for osd_id in self.osd_ids:
- self.appsV1_api.patch_namespaced_deployment_scale(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), body=client.V1Scale(
- spec=client.V1ScaleSpec(
- replicas=0
- )
- ))
+ self.appsV1_api.patch_namespaced_deployment_scale(namespace=self.rook_env.namespace,
+ name='rook-ceph-osd-{}'.format(osd_id),
+ body=client.V1Scale(spec=client.V1ScaleSpec(replicas=0)))
def set_osds_out(self) -> None:
out_flag_args = {
def delete_deployments(self) -> None:
for osd_id in self.osd_ids:
- self.appsV1_api.delete_namespaced_deployment(namespace='rook-ceph', name='rook-ceph-osd-{}'.format(osd_id), propagation_policy='Foreground')
+ self.appsV1_api.delete_namespaced_deployment(namespace=self.rook_env.namespace,
+ name='rook-ceph-osd-{}'.format(osd_id),
+ propagation_policy='Foreground')
def clean_up_prepare_jobs_and_pvc(self) -> None:
for job in self.jobs.items:
if job.metadata.labels['ceph.rook.io/pvc'] in self.pvc_to_remove:
- self.batchV1_api.delete_namespaced_job(name=job.metadata.name, namespace='rook-ceph', propagation_policy='Foreground')
- self.coreV1_api.delete_namespaced_persistent_volume_claim(name=job.metadata.labels['ceph.rook.io/pvc'], namespace='rook-ceph', propagation_policy='Foreground')
+ self.batchV1_api.delete_namespaced_job(name=job.metadata.name, namespace=self.rook_env.namespace,
+ propagation_policy='Foreground')
+ self.coreV1_api.delete_namespaced_persistent_volume_claim(name=job.metadata.labels['ceph.rook.io/pvc'],
+ namespace=self.rook_env.namespace,
+ propagation_policy='Foreground')
def purge_osds(self) -> None:
for id in self.osd_ids:
self.storage_classes : KubernetesResource = KubernetesResource(self.storageV1_api.list_storage_class)
self.rook_pods: KubernetesResource[client.V1Pod] = KubernetesResource(self.coreV1_api.list_namespaced_pod,
- namespace=self.rook_env.namespace,
- label_selector="rook_cluster={0}".format(
- self.rook_env.namespace))
+ namespace=self.rook_env.namespace,
+ label_selector="rook_cluster={0}".format(
+ self.rook_env.namespace))
self.nodes: KubernetesResource[client.V1Node] = KubernetesResource(self.coreV1_api.list_node)
def rook_url(self, path: str) -> str:
return self.fetcher.devices()
def get_osds(self) -> List:
- osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, namespace='rook-ceph', label_selector='app=rook-ceph-osd')
+ osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod,
+ namespace=self.rook_env.namespace,
+ label_selector='app=rook-ceph-osd')
return list(osd_pods.items)
def get_nfs_conf_url(self, nfs_cluster: str, instance: str) -> Optional[str]:
_update_nfs, _create_nfs)
def rm_service(self, rooktype: str, service_id: str) -> str:
- self.customObjects_api.delete_namespaced_custom_object(group="ceph.rook.io", version="v1", namespace="rook-ceph", plural=rooktype, name=service_id)
+ self.customObjects_api.delete_namespaced_custom_object(group="ceph.rook.io", version="v1",
+ namespace=self.rook_env.namespace,
+ plural=rooktype, name=service_id)
objpath = "{0}/{1}".format(rooktype, service_id)
return f'Removed {objpath}'
def get_resource(self, resource_type: str) -> Iterable:
- custom_objects: KubernetesCustomResource = KubernetesCustomResource(self.customObjects_api.list_namespaced_custom_object, group="ceph.rook.io", version="v1", namespace="rook-ceph", plural=resource_type)
+ custom_objects: KubernetesCustomResource = KubernetesCustomResource(self.customObjects_api.list_namespaced_custom_object,
+ group="ceph.rook.io",
+ version="v1",
+ namespace=self.rook_env.namespace,
+ plural=resource_type)
return custom_objects.items
def can_create_osd(self) -> bool:
api_version="batch/v1",
metadata=client.V1ObjectMeta(
name="rook-ceph-device-zap",
- namespace="rook-ceph"
+ namespace=self.rook_env.namespace
),
spec=client.V1JobSpec(
template=client.V1PodTemplateSpec(
)
)
)
- self.batchV1_api.create_namespaced_job('rook-ceph', body)
+ self.batchV1_api.create_namespaced_job(self.rook_env.namespace, body)
def rbd_mirror(self, spec: ServiceSpec) -> None:
service_id = spec.service_id or "default-rbd-mirror"