+import datetime
import threading
import functools
import os
-import uuid
+
+from ceph.deployment import inventory
+from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec
+
try:
- from typing import List
+ from typing import List, Dict, Optional, Callable, Any
+ from ceph.deployment.drive_group import DriveGroupSpec
except ImportError:
pass # just for type checking
from kubernetes.client.rest import ApiException
kubernetes_imported = True
+
+ # https://github.com/kubernetes-client/python/issues/895
+ from kubernetes.client.models.v1_container_image import V1ContainerImage
+ def names(self, names):
+ self._names = names
+ V1ContainerImage.names = V1ContainerImage.names.setter(names)
+
except ImportError:
kubernetes_imported = False
client = None
from .rook_cluster import RookCluster
-all_completions = []
-
-
-class RookReadCompletion(orchestrator.ReadCompletion):
- """
- All reads are simply API calls: avoid spawning
- huge numbers of threads by just running them
- inline when someone calls wait()
- """
-
- def __init__(self, cb):
- super(RookReadCompletion, self).__init__()
- self.cb = cb
- self._result = None
- self._complete = False
-
- self.message = "<read op>"
-
- # XXX hacky global
- global all_completions
- all_completions.append(self)
-
- @property
- def result(self):
- return self._result
-
- @property
- def is_complete(self):
- return self._complete
-
- def execute(self):
- self._result = self.cb()
- self._complete = True
-
-
-class RookWriteCompletion(orchestrator.WriteCompletion):
- """
- Writes are a two-phase thing, firstly sending
- the write to the k8s API (fast) and then waiting
- for the corresponding change to appear in the
- Ceph cluster (slow)
- """
- # XXX kubernetes bindings call_api already usefully has
- # a completion= param that uses threads. Maybe just
- # use that?
- def __init__(self, execute_cb, complete_cb, message):
- super(RookWriteCompletion, self).__init__()
- self.execute_cb = execute_cb
- self.complete_cb = complete_cb
-
- # Executed means I executed my k8s API call, it may or may
- # not have succeeded
- self.executed = False
-
- # Result of k8s API call, this is set if executed==True
- self._result = None
-
- self.effective = False
-
- self.id = str(uuid.uuid4())
-
- self.message = message
-
- self.error = None
-
- # XXX hacky global
- global all_completions
- all_completions.append(self)
-
- def __str__(self):
- return self.message
-
- @property
- def result(self):
- return self._result
-
- @property
- def is_persistent(self):
- return (not self.is_errored) and self.executed
-
- @property
- def is_effective(self):
- return self.effective
-
- @property
- def is_errored(self):
- return self.error is not None
-
- def execute(self):
- if not self.executed:
- self._result = self.execute_cb()
- self.executed = True
-
- if not self.effective:
- # TODO: check self.result for API errors
- if self.complete_cb is None:
- self.effective = True
- else:
- self.effective = self.complete_cb()
+class RookCompletion(orchestrator.Completion):
+ def evaluate(self):
+ self.finalize(None)
def deferred_read(f):
+ # type: (Callable) -> Callable[..., RookCompletion]
"""
Decorator to make RookOrchestrator methods return
a completion object that executes themselves.
@functools.wraps(f)
def wrapper(*args, **kwargs):
- return RookReadCompletion(lambda: f(*args, **kwargs))
+ return RookCompletion(on_complete=lambda _: f(*args, **kwargs))
return wrapper
+def write_completion(on_complete, # type: Callable
+ message, # type: str
+ mgr,
+ calc_percent=None # type: Optional[Callable[[], RookCompletion]]
+ ):
+ # type: (...) -> RookCompletion
+ return RookCompletion.with_progress(
+ message=message,
+ mgr=mgr,
+ on_complete=lambda _: on_complete(),
+ calc_percent=calc_percent,
+ )
+
+
class RookEnv(object):
def __init__(self):
# POD_NAMESPACE already exist for Rook 0.9
# ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
self.cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', self.namespace)
- self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', "rook-ceph-system")
+ self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', self.namespace)
self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
self.api_name = "ceph.rook.io/" + self.crd_version
class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
- MODULE_OPTIONS = [
- # TODO: configure k8s API addr instead of assuming local
- ]
-
- def wait(self, completions):
- self.log.info("wait: completions={0}".format(completions))
-
- incomplete = False
+ """
+ Writes are a two-phase thing, firstly sending
+ the write to the k8s API (fast) and then waiting
+ for the corresponding change to appear in the
+ Ceph cluster (slow)
- # Our `wait` implementation is very simple because everything's
- # just an API call.
- for c in completions:
- if not isinstance(c, RookReadCompletion) and \
- not isinstance(c, RookWriteCompletion):
- raise TypeError(
- "wait() requires list of completions, not {0}".format(
- c.__class__
- ))
+ Right now, wre calling the k8s API synchronously.
+ """
- if c.is_complete:
- continue
+ MODULE_OPTIONS = [
+ # TODO: configure k8s API addr instead of assuming local
+ ] # type: List[Dict[str, Any]]
- try:
- c.execute()
- except Exception as e:
- if not isinstance(e, orchestrator.OrchestratorError):
- self.log.exception("Completion {0} threw an exception:".format(
- c.message
- ))
- c.exception = e
- c._complete = True
+ def process(self, completions):
+ # type: (List[RookCompletion]) -> None
- if not c.is_complete:
- incomplete = True
+ if completions:
+ self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions)))
- return not incomplete
+ for p in completions:
+ p.evaluate()
@staticmethod
def can_run():
self._shutdown = threading.Event()
+ self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
+
def shutdown(self):
self._shutdown.set()
@property
def k8s(self):
+ # type: () -> client.CoreV1Api
self._initialized.wait()
+ assert self._k8s is not None
return self._k8s
@property
def rook_cluster(self):
# type: () -> RookCluster
self._initialized.wait()
+ assert self._rook_cluster is not None
return self._rook_cluster
def serve(self):
# a Rook cluster. For development convenience, also support
# running outside (reading ~/.kube config)
- if self._rook_env.cluster_name:
+ if self._rook_env.has_namespace():
config.load_incluster_config()
cluster_name = self._rook_env.cluster_name
else:
# in case we had a caller that wait()'ed on them long enough
# to get persistence but not long enough to get completion
- global all_completions
- self.wait(all_completions)
- all_completions = [c for c in all_completions if not c.is_complete]
+ self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
+ for p in self.all_progress_references:
+ p.update()
self._shutdown.wait(5)
- # TODO: watch Rook for config changes to complain/update if
- # things look a bit out of sync?
+ def cancel_completions(self):
+ for p in self.all_progress_references:
+ p.fail()
+ self.all_progress_references.clear()
@deferred_read
- def get_inventory(self, node_filter=None, refresh=False):
- node_list = None
- if node_filter and node_filter.nodes:
- # Explicit node list
- node_list = node_filter.nodes
- elif node_filter and node_filter.labels:
- # TODO: query k8s API to resolve to node list, and pass
+ def get_inventory(self, host_filter=None, refresh=False):
+ host_list = None
+ if host_filter and host_filter.hosts:
+ # Explicit host list
+ host_list = host_filter.hosts
+ elif host_filter and host_filter.labels:
+ # TODO: query k8s API to resolve to host list, and pass
# it into RookCluster.get_discovered_devices
raise NotImplementedError()
- devs = self.rook_cluster.get_discovered_devices(node_list)
+ devs = self.rook_cluster.get_discovered_devices(host_list)
result = []
- for node_name, node_devs in devs.items():
+ for host_name, host_devs in devs.items():
devs = []
- for d in node_devs:
- dev = orchestrator.InventoryDevice()
-
- # XXX CAUTION! https://github.com/rook/rook/issues/1716
- # Passing this through for the sake of completeness but it
- # is not trustworthy!
- dev.blank = d['empty']
- dev.type = 'hdd' if d['rotational'] else 'ssd'
- dev.id = d['name']
- dev.size = d['size']
-
- if d['filesystem'] == "" and not d['rotational']:
- # Empty or partitioned SSD
- partitioned_space = sum(
- [p['size'] for p in d['Partitions']])
- dev.metadata_space_free = max(0, d[
- 'size'] - partitioned_space)
-
+ for d in host_devs:
+ dev = inventory.Device(
+ path='/dev/' + d['name'],
+ sys_api=dict(
+ rotational='1' if d['rotational'] else '0',
+ size=d['size']
+ ),
+ available=d['empty'],
+ rejected_reasons=[] if d['empty'] else ['not empty'],
+ )
devs.append(dev)
- result.append(orchestrator.InventoryNode(node_name, devs))
+ result.append(orchestrator.InventoryHost(host_name, inventory.Devices(devs)))
return result
@deferred_read
- def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False):
-
- if service_type not in ("mds", "osd", "mgr", "mon", "nfs", None):
- raise orchestrator.OrchestratorValidationError(service_type + " unsupported")
+ def get_hosts(self):
+ # type: () -> List[orchestrator.HostSpec]
+ return [orchestrator.HostSpec(n) for n in self.rook_cluster.get_node_names()]
- pods = self.rook_cluster.describe_pods(service_type, service_id, node_name)
+ @deferred_read
+ def describe_service(self, service_type=None, service_name=None,
+ refresh=False):
+ now = datetime.datetime.utcnow()
+
+ # CephCluster
+ cl = self.rook_cluster.rook_api_get(
+ "cephclusters/{0}".format(self.rook_cluster.rook_env.cluster_name))
+ self.log.debug('CephCluster %s' % cl)
+ image_name = cl['spec'].get('cephVersion', {}).get('image', None)
+ num_nodes = len(self.rook_cluster.get_node_names())
+
+ spec = {}
+ spec['mon'] = orchestrator.ServiceDescription(
+ service_name='mon',
+ spec=ServiceSpec(
+ 'mon',
+ placement=PlacementSpec(
+ count=cl['spec'].get('mon', {}).get('count', 1),
+ ),
+ ),
+ size=cl['spec'].get('mon', {}).get('count', 1),
+ container_image_name=image_name,
+ last_refresh=now,
+ )
+ spec['mgr'] = orchestrator.ServiceDescription(
+ service_name='mgr',
+ spec=ServiceSpec(
+ 'mgr',
+ placement=PlacementSpec.from_string('count:1'),
+ ),
+ size=1,
+ container_image_name=image_name,
+ last_refresh=now,
+ )
+ if not cl['spec'].get('crashCollector', {}).get('disable', False):
+ spec['crash'] = orchestrator.ServiceDescription(
+ service_name='crash',
+ spec=ServiceSpec(
+ 'crash',
+ placement=PlacementSpec.from_string('*'),
+ ),
+ size=num_nodes,
+ container_image_name=image_name,
+ last_refresh=now,
+ )
+
+ # CephFilesystems
+ all_fs = self.rook_cluster.rook_api_get(
+ "cephfilesystems/")
+ self.log.debug('CephFilesystems %s' % all_fs)
+ for fs in all_fs.get('items', []):
+ svc = 'mds.' + fs['metadata']['name']
+ if svc in spec:
+ continue
+ # FIXME: we are conflating active (+ standby) with count
+ active = fs['spec'].get('metadataServer', {}).get('activeCount', 1)
+ total_mds = active
+ if fs['spec'].get('metadataServer', {}).get('activeStandby', False):
+ total_mds = active * 2
+ spec[svc] = orchestrator.ServiceDescription(
+ service_name=svc,
+ spec=ServiceSpec(
+ svc,
+ placement=PlacementSpec(count=active),
+ ),
+ size=total_mds,
+ container_image_name=image_name,
+ last_refresh=now,
+ )
+
+ # CephObjectstores
+ all_zones = self.rook_cluster.rook_api_get(
+ "cephobjectstores/")
+ self.log.debug('CephObjectstores %s' % all_zones)
+ for zone in all_zones.get('items', []):
+ rgw_realm = zone['metadata']['name']
+ rgw_zone = rgw_realm
+ svc = 'rgw.' + rgw_realm + '.' + rgw_zone
+ if svc in spec:
+ continue
+ active = zone['spec']['gateway']['instances'];
+ if 'securePort' in zone['spec']['gateway']:
+ ssl = True
+ port = zone['spec']['gateway']['securePort']
+ else:
+ ssl = False
+ port = zone['spec']['gateway']['port'] or 80
+ spec[svc] = orchestrator.ServiceDescription(
+ service_name=svc,
+ spec=RGWSpec(
+ rgw_realm=rgw_realm,
+ rgw_zone=rgw_zone,
+ ssl=ssl,
+ rgw_frontend_port=port,
+ placement=PlacementSpec(count=active),
+ ),
+ size=active,
+ container_image_name=image_name,
+ last_refresh=now,
+ )
+
+ for dd in self._list_daemons():
+ if dd.service_name() not in spec:
+ continue
+ spec[dd.service_name()].running += 1
+ return [v for k, v in spec.items()]
+ @deferred_read
+ def list_daemons(self, daemon_type=None, daemon_id=None, host=None,
+ refresh=False):
+ return self._list_daemons(daemon_type, daemon_id, host, refresh)
+
+ def _list_daemons(self, daemon_type=None, daemon_id=None, host=None,
+ refresh=False):
+ pods = self.rook_cluster.describe_pods(daemon_type, daemon_id, host)
+ self.log.debug('pods %s' % pods)
result = []
for p in pods:
- sd = orchestrator.ServiceDescription()
- sd.nodename = p['nodename']
+ sd = orchestrator.DaemonDescription()
+ sd.hostname = p['hostname']
sd.container_id = p['name']
- sd.service_type = p['labels']['app'].replace('rook-ceph-', '')
-
- if sd.service_type == "osd":
- sd.service_instance = "%s" % p['labels']["ceph-osd-id"]
- elif sd.service_type == "mds":
- sd.service = p['labels']['rook_file_system']
- pfx = "{0}-".format(sd.service)
- sd.service_instance = p['labels']['ceph_daemon_id'].replace(pfx, '', 1)
- elif sd.service_type == "mon":
- sd.service_instance = p['labels']["mon"]
- elif sd.service_type == "mgr":
- sd.service_instance = p['labels']["mgr"]
- elif sd.service_type == "nfs":
- sd.service = p['labels']['ceph_nfs']
- sd.service_instance = p['labels']['instance']
- sd.rados_config_location = self.rook_cluster.get_nfs_conf_url(sd.service, sd.service_instance)
- elif sd.service_type == "rgw":
- sd.service = p['labels']['rgw']
- sd.service_instance = p['labels']['ceph_daemon_id']
+ sd.daemon_type = p['labels']['app'].replace('rook-ceph-', '')
+ status = {
+ 'Pending': -1,
+ 'Running': 1,
+ 'Succeeded': 0,
+ 'Failed': -1,
+ 'Unknown': -1,
+ }[p['phase']]
+ sd.status = status
+ sd.status_desc = p['phase']
+
+ if 'ceph_daemon_id' in p['labels']:
+ sd.daemon_id = p['labels']['ceph_daemon_id']
+ elif 'ceph-osd-id' in p['labels']:
+ sd.daemon_id = p['labels']['ceph-osd-id']
else:
# Unknown type -- skip it
continue
+ sd.container_image_name = p['container_image_name']
+
+ sd.created = p['created']
+ sd.last_configured = p['created']
+ sd.last_deployed = p['created']
+ sd.started = p['started']
+ sd.last_refresh = p['refreshed']
result.append(sd)
return result
def _service_add_decorate(self, typename, spec, func):
- return RookWriteCompletion(lambda: func(spec), None,
- "Creating {0} services for {1}".format(typename, spec.name))
-
- def add_stateless_service(self, service_type, spec):
- # assert isinstance(spec, orchestrator.StatelessServiceSpec)
- if service_type == "mds":
- return self._service_add_decorate("Filesystem", spec,
- self.rook_cluster.add_filesystem)
- elif service_type == "rgw" :
- return self._service_add_decorate("RGW", spec,
- self.rook_cluster.add_objectstore)
- elif service_type == "nfs" :
- return self._service_add_decorate("NFS", spec,
- self.rook_cluster.add_nfsgw)
- else:
- raise NotImplementedError(service_type)
-
- def remove_stateless_service(self, service_type, service_id):
- return RookWriteCompletion(
- lambda: self.rook_cluster.rm_service(service_type, service_id), None,
- "Removing {0} services for {1}".format(service_type, service_id))
-
- def update_mons(self, num, hosts):
- if hosts:
- raise RuntimeError("Host list is not supported by rook.")
-
- return RookWriteCompletion(
- lambda: self.rook_cluster.update_mon_count(num), None,
- "Updating mon count to {0}".format(num))
-
- def update_stateless_service(self, svc_type, spec):
- # only nfs is currently supported
- if svc_type != "nfs":
- raise NotImplementedError(svc_type)
-
- num = spec.count
- return RookWriteCompletion(
- lambda: self.rook_cluster.update_nfs_count(spec.name, num), None,
- "Updating NFS server count in {0} to {1}".format(spec.name, num))
-
- def create_osds(self, drive_group, all_hosts):
- # type: (orchestrator.DriveGroupSpec, List[str]) -> RookWriteCompletion
-
- assert len(drive_group.hosts(all_hosts)) == 1
- targets = []
- if drive_group.data_devices:
- targets += drive_group.data_devices.paths
+ return write_completion(
+ on_complete=lambda : func(spec),
+ message="Creating {} services for {}".format(typename, spec.service_id),
+ mgr=self
+ )
+
+ def add_nfs(self, spec):
+ # type: (NFSServiceSpec) -> RookCompletion
+ return self._service_add_decorate("NFS", spec,
+ self.rook_cluster.add_nfsgw)
+
+ def _service_rm_decorate(self, typename, name, func):
+ return write_completion(
+ on_complete=lambda : func(name),
+ message="Removing {} services for {}".format(typename, name),
+ mgr=self
+ )
+
+ def remove_service(self, service_type, service_name):
+ if service_type == 'mds':
+ return self._service_rm_decorate(
+ 'MDS', service_name, lambda: self.rook_cluster.rm_service(
+ 'cephfilesystems', service_name)
+ )
+ elif service_type == 'rgw':
+ return self._service_rm_decorate(
+ 'RGW', service_name, lambda: self.rook_cluster.rm_service('cephobjectstores', service_name)
+ )
+ elif service_type == 'nfs':
+ return self._service_rm_decorate(
+ 'NFS', service_name, lambda: self.rook_cluster.rm_service('cephnfses', service_name)
+ )
+
+ def apply_mon(self, spec):
+ # type: (ServiceSpec) -> RookCompletion
+ if spec.placement.hosts or spec.placement.label:
+ raise RuntimeError("Host list or label is not supported by rook.")
+
+ return write_completion(
+ lambda: self.rook_cluster.update_mon_count(spec.placement.count),
+ "Updating mon count to {0}".format(spec.placement.count),
+ mgr=self
+ )
+
+ def apply_mds(self, spec):
+ # type: (ServiceSpec) -> RookCompletion
+ return self._service_add_decorate('MDS', spec,
+ self.rook_cluster.apply_filesystem)
+
+ def apply_rgw(self, spec):
+ # type: (RGWSpec) -> RookCompletion
+ return self._service_add_decorate('RGW', spec,
+ self.rook_cluster.apply_objectstore)
+
+
+ def apply_nfs(self, spec):
+ # type: (NFSServiceSpec) -> RookCompletion
+ num = spec.placement.count
+ return write_completion(
+ lambda: self.rook_cluster.update_nfs_count(spec.service_id, num),
+ "Updating NFS server count in {0} to {1}".format(spec.service_id, num),
+ mgr=self
+ )
+
+ def remove_daemons(self, names):
+ return write_completion(
+ lambda: self.rook_cluster.remove_pods(names),
+ "Removing daemons {}".format(','.join(names)),
+ mgr=self
+ )
+
+ def create_osds(self, drive_group):
+ # type: (DriveGroupSpec) -> RookCompletion
+ """ Creates OSDs from a drive group specification.
+
+ $: ceph orch osd create -i <dg.file>
+
+ The drivegroup file must only contain one spec at a time.
+ """
+
+ targets = [] # type: List[str]
+ if drive_group.data_devices and drive_group.data_devices.paths:
+ targets += [d.path for d in drive_group.data_devices.paths]
if drive_group.data_directories:
targets += drive_group.data_directories
- if not self.rook_cluster.node_exists(drive_group.hosts(all_hosts)[0]):
- raise RuntimeError("Node '{0}' is not in the Kubernetes "
- "cluster".format(drive_group.hosts(all_hosts)))
+ def execute(all_hosts_):
+ # type: (List[orchestrator.HostSpec]) -> orchestrator.Completion
+ all_hosts = [h.hostname for h in all_hosts_]
+ matching_hosts = drive_group.placement.pattern_matches_hosts(all_hosts)
+
+ assert len(matching_hosts) == 1
+
+ if not self.rook_cluster.node_exists(matching_hosts[0]):
+ raise RuntimeError("Node '{0}' is not in the Kubernetes "
+ "cluster".format(matching_hosts))
+
+ # Validate whether cluster CRD can accept individual OSD
+ # creations (i.e. not useAllDevices)
+ if not self.rook_cluster.can_create_osd():
+ raise RuntimeError("Rook cluster configuration does not "
+ "support OSD creation.")
- # Validate whether cluster CRD can accept individual OSD
- # creations (i.e. not useAllDevices)
- if not self.rook_cluster.can_create_osd():
- raise RuntimeError("Rook cluster configuration does not "
- "support OSD creation.")
+ return orchestrator.Completion.with_progress(
+ message="Creating OSD on {0}:{1}".format(
+ matching_hosts,
+ targets),
+ mgr=self,
+ on_complete=lambda _:self.rook_cluster.add_osds(drive_group, all_hosts),
+ calc_percent=lambda: has_osds(all_hosts)
+ )
- def execute():
- return self.rook_cluster.add_osds(drive_group, all_hosts)
+ @deferred_read
+ def has_osds(all_hosts):
+ matching_hosts = drive_group.placement.pattern_matches_hosts(all_hosts)
- def is_complete():
# Find OSD pods on this host
pod_osd_ids = set()
- pods = self._k8s.list_namespaced_pod(self._rook_env.namespace,
+ pods = self.k8s.list_namespaced_pod(self._rook_env.namespace,
label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
field_selector="spec.nodeName={0}".format(
- drive_group.hosts(all_hosts)[0]
+ matching_hosts[0]
)).items
for p in pods:
pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
return found is not None
- return RookWriteCompletion(execute, is_complete,
- "Creating OSD on {0}:{1}".format(
- drive_group.hosts(all_hosts)[0], targets
- ))
+ c = self.get_hosts().then(execute)
+ return c