import json
import logging
import re
+import socket
+import time
from abc import ABCMeta, abstractmethod
from typing import TYPE_CHECKING, List, Callable, TypeVar, \
Optional, Dict, Any, Tuple, NewType, cast
from ceph.deployment.service_spec import ServiceSpec, RGWSpec
from ceph.deployment.utils import is_ipv6, unwrap_ipv6
+from mgr_util import build_url
from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
from orchestrator._interface import daemon_type_to_service
from cephadm import utils
extra_files: Optional[Dict[str, Any]] = None,
daemon_type: Optional[str] = None,
ip: Optional[str] = None,
- ports: Optional[List[int]] = None):
+ ports: Optional[List[int]] = None,
+ rank: Optional[int] = None,
+ rank_generation: Optional[int] = None,
+ extra_container_args: Optional[List[str]] = None):
"""
A data struction to encapsulate `cephadm deploy ...
"""
self.final_config: Dict[str, Any] = {}
self.deps: List[str] = []
+ self.rank: Optional[int] = rank
+ self.rank_generation: Optional[int] = rank_generation
+
+ self.extra_container_args = extra_container_args
+
def name(self) -> str:
return '%s.%s' % (self.daemon_type, self.daemon_id)
service_name=dd.service_name(),
ip=dd.ip,
ports=dd.ports,
+ rank=dd.rank,
+ rank_generation=dd.rank_generation,
+ extra_container_args=dd.extra_container_args,
)
def to_daemon_description(self, status: DaemonDescriptionStatus, status_desc: str) -> DaemonDescription:
return DaemonDescription(
daemon_type=self.daemon_type,
daemon_id=self.daemon_id,
+ service_name=self.service_name,
hostname=self.host,
status=status,
status_desc=status_desc,
ip=self.ip,
ports=self.ports,
+ rank=self.rank,
+ rank_generation=self.rank_generation,
+ extra_container_args=self.extra_container_args,
)
self.mgr: "CephadmOrchestrator" = mgr
def allow_colo(self) -> bool:
+ """
+ Return True if multiple daemons of the same type can colocate on
+ the same host.
+ """
return False
+ def primary_daemon_type(self) -> str:
+ """
+ This is the type of the primary (usually only) daemon to be deployed.
+ """
+ return self.TYPE
+
def per_host_daemon_type(self) -> Optional[str]:
+ """
+ If defined, this type of daemon will be deployed once for each host
+ containing one or more daemons of the primary type.
+ """
return None
- def primary_daemon_type(self) -> str:
- return self.TYPE
+ def ranked(self) -> bool:
+ """
+ If True, we will assign a stable rank (0, 1, ...) and monotonically increasing
+ generation (0, 1, ...) to each daemon we create/deploy.
+ """
+ return False
+
+ def fence_old_ranks(self,
+ spec: ServiceSpec,
+ rank_map: Dict[int, Dict[int, Optional[str]]],
+ num_ranks: int) -> None:
+ assert False
def make_daemon_spec(
- self, host: str,
+ self,
+ host: str,
daemon_id: str,
network: str,
spec: ServiceSpecs,
daemon_type: Optional[str] = None,
ports: Optional[List[int]] = None,
ip: Optional[str] = None,
+ rank: Optional[int] = None,
+ rank_generation: Optional[int] = None,
) -> CephadmDaemonDeploySpec:
+ try:
+ eca = spec.extra_container_args
+ except AttributeError:
+ eca = None
return CephadmDaemonDeploySpec(
host=host,
daemon_id=daemon_id,
daemon_type=daemon_type,
ports=ports,
ip=ip,
+ rank=rank,
+ rank_generation=rank_generation,
+ extra_container_args=eca,
)
def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
raise NotImplementedError()
- def config(self, spec: ServiceSpec, daemon_id: str) -> None:
+ def config(self, spec: ServiceSpec) -> None:
"""
Configure the cluster for this service. Only called *once* per
service apply. Not for every daemon.
self.mgr.log.warning(f"Unable to update caps for {entity}")
return keyring
- def _inventory_get_addr(self, hostname: str) -> str:
- """Get a host's address with its hostname."""
- return self.mgr.inventory.get_addr(hostname)
+ def _inventory_get_fqdn(self, hostname: str) -> str:
+ """Get a host's FQDN with its hostname.
+
+ If the FQDN can't be resolved, the address from the inventory will
+ be returned instead.
+ """
+ addr = self.mgr.inventory.get_addr(hostname)
+ return socket.getfqdn(addr)
def _set_service_url_on_dashboard(self,
service_name: str,
assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
logger.debug(f'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
- def post_remove(self, daemon: DaemonDescription) -> None:
+ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
"""
Called after the daemon is removed.
"""
return cephadm_config, []
- def post_remove(self, daemon: DaemonDescription) -> None:
- super().post_remove(daemon)
+ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+ super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
self.remove_keyring(daemon)
def get_auth_entity(self, daemon_id: str, host: str = "") -> AuthEntity:
# the CephService class refers to service types, not daemon types
if self.TYPE in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
return AuthEntity(f'client.{self.TYPE}.{daemon_id}')
- elif self.TYPE == 'crash':
+ elif self.TYPE in ['crash', 'agent']:
if host == "":
- raise OrchestratorError("Host not provided to generate <crash> auth entity name")
+ raise OrchestratorError(
+ f'Host not provided to generate <{self.TYPE}> auth entity name')
return AuthEntity(f'client.{self.TYPE}.{host}')
elif self.TYPE == 'mon':
return AuthEntity('mon.')
daemon_id: str = daemon.daemon_id
host: str = daemon.hostname
- if daemon_id == 'mon':
- # do not remove the mon keyring
- return
+ assert daemon.daemon_type != 'mon'
entity = self.get_auth_entity(daemon_id, host=host)
'name': daemon_id,
})
+ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+ # Do not remove the mon keyring.
+ # super().post_remove(daemon)
+ pass
+
class MgrService(CephService):
TYPE = 'mgr'
+ def allow_colo(self) -> bool:
+ if self.mgr.get_ceph_option('mgr_standby_modules'):
+ # traditional mgr mode: standby daemons' modules listen on
+ # ports and redirect to the primary. we must not schedule
+ # multiple mgrs on the same host or else ports will
+ # conflict.
+ return False
+ else:
+ # standby daemons do nothing, and therefore port conflicts
+ # are not a concern.
+ return True
+
def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
"""
Create a new manager instance on a host.
return DaemonDescription()
def fail_over(self) -> None:
- if not self.mgr_map_has_standby():
- raise OrchestratorError('Need standby mgr daemon', event_kind_subject=(
- 'daemon', 'mgr' + self.mgr.get_mgr_id()))
-
- self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(),
- 'INFO', 'Failing over to other MGR')
- logger.info('Failing over to other MGR')
-
- # fail over
- ret, out, err = self.mgr.check_mon_command({
- 'prefix': 'mgr fail',
- 'who': self.mgr.get_mgr_id(),
- })
+ # this has been seen to sometimes transiently fail even when there are multiple
+ # mgr daemons. As long as there are multiple known mgr daemons, we should retry.
+ class NoStandbyError(OrchestratorError):
+ pass
+ no_standby_exc = NoStandbyError('Need standby mgr daemon', event_kind_subject=(
+ 'daemon', 'mgr' + self.mgr.get_mgr_id()))
+ for sleep_secs in [2, 8, 15]:
+ try:
+ if not self.mgr_map_has_standby():
+ raise no_standby_exc
+ self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(),
+ 'INFO', 'Failing over to other MGR')
+ logger.info('Failing over to other MGR')
+
+ # fail over
+ ret, out, err = self.mgr.check_mon_command({
+ 'prefix': 'mgr fail',
+ 'who': self.mgr.get_mgr_id(),
+ })
+ return
+ except NoStandbyError:
+ logger.info(
+ f'Failed to find standby mgr for failover. Retrying in {sleep_secs} seconds')
+ time.sleep(sleep_secs)
+ raise no_standby_exc
def mgr_map_has_standby(self) -> bool:
"""
def allow_colo(self) -> bool:
return True
- def config(self, spec: ServiceSpec, daemon_id: str) -> None:
+ def config(self, spec: ServiceSpec) -> None:
assert self.TYPE == spec.service_type
assert spec.service_id
def allow_colo(self) -> bool:
return True
- def config(self, spec: RGWSpec, rgw_id: str) -> None: # type: ignore
+ def config(self, spec: RGWSpec) -> None: # type: ignore
assert self.TYPE == spec.service_type
# set rgw_realm and rgw_zone, if present
logger.info('Saving service %s spec with placement %s' % (
spec.service_name(), spec.placement.pretty_str()))
self.mgr.spec_store.save(spec)
+ self.mgr.trigger_connect_dashboard_rgw()
def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
assert self.TYPE == daemon_spec.daemon_type
if ftype == 'beast':
if spec.ssl:
if daemon_spec.ip:
- args.append(f"ssl_endpoint={daemon_spec.ip}:{port}")
+ args.append(
+ f"ssl_endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
else:
args.append(f"ssl_port={port}")
args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
else:
if daemon_spec.ip:
- args.append(f"endpoint={daemon_spec.ip}:{port}")
+ args.append(f"endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
else:
args.append(f"port={port}")
elif ftype == 'civetweb':
if spec.ssl:
if daemon_spec.ip:
- args.append(f"port={daemon_spec.ip}:{port}s") # note the 's' suffix on port
+ # note the 's' suffix on port
+ args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}s")
else:
args.append(f"port={port}s") # note the 's' suffix on port
args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
else:
if daemon_spec.ip:
- args.append(f"port={daemon_spec.ip}:{port}")
+ args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
else:
args.append(f"port={port}")
frontend = f'{ftype} {" ".join(args)}'
'prefix': 'config-key rm',
'key': f'rgw/cert/{service_name}',
})
+ self.mgr.trigger_connect_dashboard_rgw()
- def post_remove(self, daemon: DaemonDescription) -> None:
- super().post_remove(daemon)
+ def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
+ super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
self.mgr.check_mon_command({
'prefix': 'config rm',
'who': utils.name_to_config_section(daemon.name()),
warn_message = "WARNING: Removing RGW daemons can cause clients to lose connectivity. "
return HandleCommandResult(-errno.EBUSY, '', warn_message)
+ def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
+ self.mgr.trigger_connect_dashboard_rgw()
+
class RbdMirrorService(CephService):
TYPE = 'rbd-mirror'
class CephfsMirrorService(CephService):
TYPE = 'cephfs-mirror'
+ def config(self, spec: ServiceSpec) -> None:
+ # make sure mirroring module is enabled
+ mgr_map = self.mgr.get('mgr_map')
+ mod_name = 'mirroring'
+ if mod_name not in mgr_map.get('services', {}):
+ self.mgr.check_mon_command({
+ 'prefix': 'mgr module enable',
+ 'module': mod_name
+ })
+ # we shouldn't get here (mon will tell the mgr to respawn), but no
+ # harm done if we do.
+
def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
assert self.TYPE == daemon_spec.daemon_type
ret, keyring, err = self.mgr.check_mon_command({
'prefix': 'auth get-or-create',
'entity': self.get_auth_entity(daemon_spec.daemon_id),
- 'caps': ['mon', 'allow r',
+ 'caps': ['mon', 'profile cephfs-mirror',
'mds', 'allow r',
'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*',
'mgr', 'allow r'],
daemon_spec.keyring = keyring
daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
return daemon_spec
+
+
+class CephadmAgent(CephService):
+ TYPE = 'agent'
+
+ def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
+ assert self.TYPE == daemon_spec.daemon_type
+ daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
+
+ if not self.mgr.cherrypy_thread:
+ raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint')
+
+ keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), [])
+ daemon_spec.keyring = keyring
+ self.mgr.agent_cache.agent_keys[host] = keyring
+
+ daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
+
+ return daemon_spec
+
+ def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
+ try:
+ assert self.mgr.cherrypy_thread
+ assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
+ assert self.mgr.cherrypy_thread.server_port
+ except Exception:
+ raise OrchestratorError(
+ 'Cannot deploy agent daemons until cephadm endpoint has finished generating certs')
+
+ cfg = {'target_ip': self.mgr.get_mgr_ip(),
+ 'target_port': self.mgr.cherrypy_thread.server_port,
+ 'refresh_period': self.mgr.agent_refresh_rate,
+ 'listener_port': self.mgr.agent_starting_port,
+ 'host': daemon_spec.host,
+ 'device_enhanced_scan': str(self.mgr.device_enhanced_scan)}
+
+ listener_cert, listener_key = self.mgr.cherrypy_thread.ssl_certs.generate_cert(
+ self.mgr.inventory.get_addr(daemon_spec.host))
+ config = {
+ 'agent.json': json.dumps(cfg),
+ 'keyring': daemon_spec.keyring,
+ 'root_cert.pem': self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
+ 'listener.crt': listener_cert,
+ 'listener.key': listener_key,
+ }
+
+ return config, sorted([str(self.mgr.get_mgr_ip()), str(self.mgr.cherrypy_thread.server_port),
+ self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
+ str(self.mgr.get_module_option('device_enhanced_scan'))])