-import datetime
import threading
import functools
import os
from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec
+from ceph.utils import datetime_now
+
+from typing import List, Dict, Optional, Callable, Any, TypeVar, Tuple
try:
- from typing import List, Dict, Optional, Callable, Any, Tuple
from ceph.deployment.drive_group import DriveGroupSpec
except ImportError:
pass # just for type checking
# https://github.com/kubernetes-client/python/issues/895
from kubernetes.client.models.v1_container_image import V1ContainerImage
- def names(self, names):
+ def names(self: Any, names: Any) -> None:
self._names = names
V1ContainerImage.names = V1ContainerImage.names.setter(names)
from .rook_cluster import RookCluster
+T = TypeVar('T')
+FuncT = TypeVar('FuncT', bound=Callable)
+ServiceSpecT = TypeVar('ServiceSpecT', bound=ServiceSpec)
class RookEnv(object):
- def __init__(self):
+ def __init__(self) -> None:
# POD_NAMESPACE already exist for Rook 0.9
self.namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph')
self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
self.api_name = "ceph.rook.io/" + self.crd_version
- def api_version_match(self):
+ def api_version_match(self) -> bool:
return self.crd_version == 'v1'
- def has_namespace(self):
+ def has_namespace(self) -> bool:
return 'POD_NAMESPACE' in os.environ
]
@staticmethod
- def can_run():
+ def can_run() -> Tuple[bool, str]:
if not kubernetes_imported:
return False, "`kubernetes` python module not found"
if not RookEnv().api_version_match():
else:
return True, "", {}
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
super(RookOrchestrator, self).__init__(*args, **kwargs)
self._initialized = threading.Event()
- self._k8s_CoreV1_api = None
- self._k8s_BatchV1_api = None
- self._rook_cluster = None
+ self._k8s_CoreV1_api: Optional[client.CoreV1Api] = None
+ self._k8s_BatchV1_api: Optional[client.BatchV1Api] = None
+ self._rook_cluster: Optional[RookCluster] = None
self._rook_env = RookEnv()
self._shutdown = threading.Event()
assert self._rook_cluster is not None
return self._rook_cluster
- def serve(self):
+ def serve(self) -> None:
# For deployed clusters, we should always be running inside
# a Rook cluster. For development convenience, also support
# running outside (reading ~/.kube config)
self._shutdown.wait(5)
@handle_orch_error
- def get_inventory(self, host_filter=None, refresh=False):
+ def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
host_list = None
if host_filter and host_filter.hosts:
# Explicit host list
# it into RookCluster.get_discovered_devices
raise NotImplementedError()
- devs = self.rook_cluster.get_discovered_devices(host_list)
+ discovered_devs = self.rook_cluster.get_discovered_devices(host_list)
result = []
- for host_name, host_devs in devs.items():
+ for host_name, host_devs in discovered_devs.items():
devs = []
for d in host_devs:
if 'cephVolumeData' in d and d['cephVolumeData']:
return [orchestrator.HostSpec(n) for n in self.rook_cluster.get_node_names()]
@handle_orch_error
- def describe_service(self, service_type=None, service_name=None,
- refresh=False):
- now = datetime.datetime.utcnow()
+ def describe_service(self,
+ service_type: Optional[str] = None,
+ service_name: Optional[str] = None,
+ refresh: bool = False) -> List[orchestrator.ServiceDescription]:
+ now = datetime_now()
# CephCluster
cl = self.rook_cluster.rook_api_get(
service.container_image_id = dd.container_image_id
if not service.container_image_name:
service.container_image_name = dd.container_image_name
- if not service.last_refresh or not dd.last_refresh or dd.last_refresh < service.last_refresh:
+ if service.last_refresh is None or not dd.last_refresh or dd.last_refresh < service.last_refresh:
service.last_refresh = dd.last_refresh
- if not service.created or dd.created < service.created:
+ if service.created is None or dd.created is None or dd.created < service.created:
service.created = dd.created
return [v for k, v in spec.items()]
@handle_orch_error
- def list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None,
- refresh=False):
+ def list_daemons(self,
+ service_name: Optional[str] = None,
+ daemon_type: Optional[str] = None,
+ daemon_id: Optional[str] = None,
+ host: Optional[str] = None,
+ refresh: bool = False) -> List[orchestrator.DaemonDescription]:
return self._list_daemons(service_name=service_name,
daemon_type=daemon_type,
daemon_id=daemon_id,
host=host,
refresh=refresh)
- def _list_daemons(self, service_name=None, daemon_type=None, daemon_id=None, host=None,
- refresh=False):
+ def _list_daemons(self,
+ service_name: Optional[str] = None,
+ daemon_type: Optional[str] = None,
+ daemon_id: Optional[str] = None,
+ host: Optional[str] = None,
+ refresh: bool = False) -> List[orchestrator.DaemonDescription]:
pods = self.rook_cluster.describe_pods(daemon_type, daemon_id, host)
self.log.debug('pods %s' % pods)
result = []