"""
raise NotImplementedError()
- def enter_host_maintenance(self, hostname: str, force: bool = False) -> OrchResult:
+ def enter_host_maintenance(self, hostname: str, force: bool = False, yes_i_really_mean_it: bool = False) -> OrchResult:
"""
Place a host in maintenance, stopping daemons and disabling it's systemd target
"""
"""
raise NotImplementedError()
+ def service_discovery_dump_cert(self) -> OrchResult:
+ """
+ Returns service discovery server root certificate
+
+ :return: service discovery root certificate
+ """
+ raise NotImplementedError()
+
def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, refresh: bool = False) -> OrchResult[List['ServiceDescription']]:
"""
Describe a service (of any kind) that is already configured in
return OrchResult(l_res)
return raise_if_exception(reduce(merge, [fns[spec.service_type](spec) for spec in specs], OrchResult([])))
+ def set_unmanaged(self, service_name: str, value: bool) -> OrchResult[str]:
+ """
+ Set unmanaged parameter to True/False for a given service
+
+ :return: None
+ """
+ raise NotImplementedError()
+
def plan(self, spec: Sequence["GenericSpec"]) -> OrchResult[List]:
"""
Plan (Dry-run, Preview) a List of Specs.
def remove_osds(self, osd_ids: List[str],
replace: bool = False,
force: bool = False,
- zap: bool = False) -> OrchResult[str]:
+ zap: bool = False,
+ no_destroy: bool = False) -> OrchResult[str]:
"""
:param osd_ids: list of OSD IDs
:param replace: marks the OSD as being destroyed. See :ref:`orchestrator-osd-replace`
:param force: Forces the OSD removal process without waiting for the data to be drained first.
:param zap: Zap/Erase all devices associated with the OSDs (DESTROYS DATA)
+ :param no_destroy: Do not destroy associated VGs/LVs with the OSD.
.. note:: this can only remove OSDs that were successfully
"""Update prometheus cluster"""
raise NotImplementedError()
+ def get_prometheus_access_info(self) -> OrchResult[Dict[str, str]]:
+ """get prometheus access information"""
+ raise NotImplementedError()
+
+ def get_alertmanager_access_info(self) -> OrchResult[Dict[str, str]]:
+ """get alertmanager access information"""
+ raise NotImplementedError()
+
def apply_node_exporter(self, spec: ServiceSpec) -> OrchResult[str]:
"""Update existing a Node-Exporter daemon(s)"""
raise NotImplementedError()
'container': 'container',
'agent': 'agent',
'snmp-gateway': 'snmp-gateway',
+ 'elasticsearch': 'elasticsearch',
+ 'jaeger-agent': 'jaeger-agent',
+ 'jaeger-collector': 'jaeger-collector',
+ 'jaeger-query': 'jaeger-query'
}
return mapping[dtype]
'container': ['container'],
'agent': ['agent'],
'snmp-gateway': ['snmp-gateway'],
+ 'elasticsearch': ['elasticsearch'],
+ 'jaeger-agent': ['jaeger-agent'],
+ 'jaeger-collector': ['jaeger-collector'],
+ 'jaeger-query': ['jaeger-query'],
+ 'jaeger-tracing': ['elasticsearch', 'jaeger-query', 'jaeger-collector', 'jaeger-agent']
}
return mapping[stype]
return (daemon_type_to_service(self.daemon_type) + '.' + self.daemon_id).startswith(service_name + '.')
return False
+ def matches_digests(self, digests: Optional[List[str]]) -> bool:
+ # the DaemonDescription class maintains a list of container digests
+ # for the container image last reported as being used for the daemons.
+ # This function checks if any of those digests match any of the digests
+ # in the list of digests provided as an arg to this function
+ if not digests or not self.container_image_digests:
+ return False
+ return any(d in digests for d in self.container_image_digests)
+
+ def matches_image_name(self, image_name: Optional[str]) -> bool:
+ # the DaemonDescription class has an attribute that tracks the image
+ # name of the container image last reported as being used by the daemon.
+ # This function compares if the image name provided as an arg matches
+ # the image name in said attribute
+ if not image_name or not self.container_image_name:
+ return False
+ return image_name == self.container_image_name
+
def service_id(self) -> str:
assert self.daemon_id is not None
assert self.daemon_type is not None
def get_port_summary(self) -> str:
if not self.ports:
return ''
- return f"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, self.ports or []))}"
+ ports = sorted([int(x) for x in self.ports])
+ return f"{(self.virtual_ip or '?').split('/')[0]}:{','.join(map(str, ports or []))}"
def to_json(self) -> OrderedDict:
out = self.spec.to_json()
Typical use:
- filter by host when presentig UI workflow for configuring
+ filter by host when presenting UI workflow for configuring
a particular server.
filter by label when not all of estate is Ceph servers,
and we want to only learn about the Ceph servers.
>>> import mgr_module
>>> #doctest: +SKIP
- ... class MyImplentation(mgr_module.MgrModule, Orchestrator):
+ ... class MyImplementation(mgr_module.MgrModule, Orchestrator):
... def __init__(self, ...):
... self.orch_client = OrchestratorClientMixin()
... self.orch_client.set_mgr(self.mgr))
def set_mgr(self, mgr: MgrModule) -> None:
"""
- Useable in the Dashbord that uses a global ``mgr``
+ Useable in the Dashboard that uses a global ``mgr``
"""
self.__mgr = mgr # Make sure we're not overwriting any other `mgr` properties