from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
- ServiceSpecValidationError
+ ServiceSpecValidationError, IscsiServiceSpec
from ceph.deployment.drive_group import DriveGroupSpec
from mgr_module import MgrModule, CLICommand, HandleCommandResult
call one completion from another completion. I.e. making them re-usable
using Promises E.g.::
- >>> return Orchestrator().get_hosts().then(self._create_osd)
+ >>> #doctest: +SKIP
+ ... return Orchestrator().get_hosts().then(self._create_osd)
where ``get_hosts`` returns a Completion of list of hosts and
``_create_osd`` takes a list of hosts.
The concept behind this is to store the computation steps
explicit and then explicitly evaluate the chain:
- >>> p = Completion(on_complete=lambda x: x*2).then(on_complete=lambda x: str(x))
+ >>> #doctest: +SKIP
+ ... p = Completion(on_complete=lambda x: x*2).then(on_complete=lambda x: str(x))
... p.finalize(2)
... assert p.result = "4"
is actually available in the orchestrator. I.e. this
won't work as expected::
- >>> if OrchestratorClientMixin().available()[0]: # wrong.
+ >>> #doctest: +SKIP
+ ... if OrchestratorClientMixin().available()[0]: # wrong.
... OrchestratorClientMixin().get_hosts()
:return: two-tuple of boolean, string
is actually possible in the orchestrator. I.e. this
won't work as expected::
- >>> api = OrchestratorClientMixin()
+ >>> #doctest: +SKIP
+ ... api = OrchestratorClientMixin()
... if api.get_feature_set()['get_hosts']['available']: # wrong.
... api.get_hosts()
It's better to ask for forgiveness instead::
- >>> try:
+ >>> #doctest: +SKIP
+ ... try:
... OrchestratorClientMixin().get_hosts()
... except (OrchestratorError, NotImplementedError):
... ...
'alertmanager': self.apply_alertmanager,
'crash': self.apply_crash,
'grafana': self.apply_grafana,
+ 'iscsi': cast(Callable[[ServiceSpec], Completion], self.apply_iscsi),
'mds': self.apply_mds,
'mgr': self.apply_mgr,
'mon': self.apply_mon,
"""
raise NotImplementedError()
- def list_specs(self, service_name=None):
- # type: (Optional[str]) -> Completion
- """
- Lists saved service specs
- """
- raise NotImplementedError()
-
def remove_service(self, service_name):
# type: (str) -> Completion
"""
""" Update OSD cluster """
raise NotImplementedError()
+ def set_unmanaged_flag(self, service_name: str, unmanaged_flag: bool) -> HandleCommandResult:
+ raise NotImplementedError()
+
+ def preview_drivegroups(self, drive_group_name: Optional[str] = 'osd',
+ dg_specs: Optional[List[DriveGroupSpec]] = None) -> List[Dict[str, Dict[Any, Any]]]:
+ """ Get a preview for OSD deployments """
+ raise NotImplementedError()
+
def remove_osds(self, osd_ids: List[str],
replace: bool = False,
force: bool = False) -> Completion:
"""Update NFS cluster"""
raise NotImplementedError()
+ def add_iscsi(self, spec):
+ # type: (IscsiServiceSpec) -> Completion
+ """Create iscsi daemon(s)"""
+ raise NotImplementedError()
+
+ def apply_iscsi(self, spec):
+ # type: (IscsiServiceSpec) -> Completion
+ """Update iscsi cluster"""
+ raise NotImplementedError()
+
def add_prometheus(self, spec):
# type: (ServiceSpec) -> Completion
"""Create new prometheus daemon"""
return self.name().startswith(service_name + '.')
return False
- def service_name(self):
+ def service_id(self):
if self.daemon_type == 'rgw':
- v = self.daemon_id.split('.')
- s_name = '.'.join(v[0:2])
- return 'rgw.%s' % s_name
- if self.daemon_type in ['mds', 'nfs']:
- _s_name = self.daemon_id.split('.')[0]
- return '%s.%s' % (self.daemon_type, _s_name)
+ if self.hostname and self.hostname in self.daemon_id:
+ pre, post_ = self.daemon_id.split(self.hostname)
+ return pre[:-1]
+ else:
+ # daemon_id == "realm.zone.host.random"
+ v = self.daemon_id.split('.')
+ if len(v) == 4:
+ return '.'.join(v[0:2])
+ # subcluster or fqdn? undecidable.
+ raise OrchestratorError(f"DaemonDescription: Cannot calculate service_id: {v}")
+ if self.daemon_type in ['mds', 'nfs', 'iscsi']:
+ return self.daemon_id.split('.')[0]
+ return self.daemon_type
+
+ def service_name(self):
+ if self.daemon_type in ['rgw', 'mds', 'nfs', 'iscsi']:
+ return f'{self.daemon_type}.{self.service_id()}'
return self.daemon_type
def __repr__(self):
c[k] = datetime.datetime.strptime(c[k], DATEFMT)
return cls(**c)
+ def __copy__(self):
+ # feel free to change this:
+ return DaemonDescription.from_json(self.to_json())
+
class ServiceDescription(object):
"""
For responding to queries about the status of a particular service,
"""
def __init__(self,
+ spec: ServiceSpec,
container_image_id=None,
container_image_name=None,
- service_name=None,
rados_config_location=None,
service_url=None,
last_refresh=None,
created=None,
size=0,
- running=0,
- spec=None):
+ running=0):
# Not everyone runs in containers, but enough people do to
# justify having the container_image_id (image hash) and container_image
# (image name)
self.container_image_id = container_image_id # image hash
self.container_image_name = container_image_name # image friendly name
- # The service_name is either a bare type (e.g., 'mgr') or
- # type.id combination (e.g., 'mds.fsname' or 'rgw.realm.zone').
- self.service_name = service_name
-
# Location of the service configuration when stored in rados
# object. Format: "rados://<pool>/[<namespace/>]<object>"
self.rados_config_location = rados_config_location
self.last_refresh = last_refresh # type: Optional[datetime.datetime]
self.created = created # type: Optional[datetime.datetime]
- self.spec = spec
+ self.spec: ServiceSpec = spec
def service_type(self):
- if self.service_name:
- return self.service_name.split('.')[0]
- return None
+ return self.spec.service_type
def __repr__(self):
- return "<ServiceDescription>({name})".format(name=self.service_name)
+ return f"<ServiceDescription of {self.spec.one_line_str()}>"
def to_json(self):
- out = {
+ out = self.spec.to_json()
+ status = {
'container_image_id': self.container_image_id,
'container_image_name': self.container_image_name,
- 'service_name': self.service_name,
'rados_config_location': self.rados_config_location,
'service_url': self.service_url,
'size': self.size,
'running': self.running,
- 'spec': self.spec.to_json() if self.spec is not None else None
+ 'last_refresh': self.last_refresh,
+ 'created': self.created
}
for k in ['last_refresh', 'created']:
if getattr(self, k):
- out[k] = getattr(self, k).strftime(DATEFMT)
- return {k: v for (k, v) in out.items() if v is not None}
+ status[k] = getattr(self, k).strftime(DATEFMT)
+ status = {k: v for (k, v) in status.items() if v is not None}
+ out['status'] = status
+ return out
@classmethod
@handle_type_error
- def from_json(cls, data):
+ def from_json(cls, data: dict):
c = data.copy()
+ status = c.pop('status', {})
+ spec = ServiceSpec.from_json(c)
+
+ c_status = status.copy()
for k in ['last_refresh', 'created']:
- if k in c:
- c[k] = datetime.datetime.strptime(c[k], DATEFMT)
- return cls(**c)
+ if k in c_status:
+ c_status[k] = datetime.datetime.strptime(c_status[k], DATEFMT)
+ return cls(spec=spec, **c_status)
class InventoryFilter(object):
name = _data.pop('name')
addr = _data.pop('addr', None) or name
devices = inventory.Devices.from_json(_data.pop('devices'))
+ labels = _data.pop('labels', list())
if _data:
error_msg = 'Unknown key(s) in Inventory: {}'.format(','.join(_data.keys()))
raise OrchestratorValidationError(error_msg)
- labels = _data.get('labels', list())
return cls(name, devices, labels, addr)
except KeyError as e:
error_msg = '{} is required for {}'.format(e, cls.__name__)
>>> import mgr_module
- >>> class MyImplentation(mgr_module.MgrModule, Orchestrator):
+ >>> #doctest: +SKIP
+ ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
... def __init__(self, ...):
... self.orch_client = OrchestratorClientMixin()
... self.orch_client.set_mgr(self.mgr))