X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Fpybind%2Fmgr%2Fcephadm%2Fservices%2Fcephadmservice.py;h=8abb0e63a2c102181d7937c3cf8f99c357a4fbb9;hb=33c7a0ef2143973309014ab28861a6fa401a5aa5;hp=65dd7b0f7027b9d6c59c0ebbadaea0290e249b77;hpb=a4b75251e677cd644c8d7771f62d4819aabe7d6c;p=ceph.git diff --git a/ceph/src/pybind/mgr/cephadm/services/cephadmservice.py b/ceph/src/pybind/mgr/cephadm/services/cephadmservice.py index 65dd7b0f7..8abb0e63a 100644 --- a/ceph/src/pybind/mgr/cephadm/services/cephadmservice.py +++ b/ceph/src/pybind/mgr/cephadm/services/cephadmservice.py @@ -2,6 +2,8 @@ import errno import json import logging import re +import socket +import time from abc import ABCMeta, abstractmethod from typing import TYPE_CHECKING, List, Callable, TypeVar, \ Optional, Dict, Any, Tuple, NewType, cast @@ -37,7 +39,8 @@ class CephadmDaemonDeploySpec: ip: Optional[str] = None, ports: Optional[List[int]] = None, rank: Optional[int] = None, - rank_generation: Optional[int] = None): + rank_generation: Optional[int] = None, + extra_container_args: Optional[List[str]] = None): """ A data struction to encapsulate `cephadm deploy ... """ @@ -72,6 +75,8 @@ class CephadmDaemonDeploySpec: self.rank: Optional[int] = rank self.rank_generation: Optional[int] = rank_generation + self.extra_container_args = extra_container_args + def name(self) -> str: return '%s.%s' % (self.daemon_type, self.daemon_id) @@ -96,6 +101,7 @@ class CephadmDaemonDeploySpec: ports=dd.ports, rank=dd.rank, rank_generation=dd.rank_generation, + extra_container_args=dd.extra_container_args, ) def to_daemon_description(self, status: DaemonDescriptionStatus, status_desc: str) -> DaemonDescription: @@ -110,6 +116,7 @@ class CephadmDaemonDeploySpec: ports=self.ports, rank=self.rank, rank_generation=self.rank_generation, + extra_container_args=self.extra_container_args, ) @@ -171,6 +178,10 @@ class CephadmService(metaclass=ABCMeta): rank: Optional[int] = None, rank_generation: Optional[int] = None, ) -> CephadmDaemonDeploySpec: + try: + eca = spec.extra_container_args + except AttributeError: + eca = None return CephadmDaemonDeploySpec( host=host, daemon_id=daemon_id, @@ -181,6 +192,7 @@ class CephadmService(metaclass=ABCMeta): ip=ip, rank=rank, rank_generation=rank_generation, + extra_container_args=eca, ) def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: @@ -229,9 +241,14 @@ class CephadmService(metaclass=ABCMeta): self.mgr.log.warning(f"Unable to update caps for {entity}") return keyring - def _inventory_get_addr(self, hostname: str) -> str: - """Get a host's address with its hostname.""" - return self.mgr.inventory.get_addr(hostname) + def _inventory_get_fqdn(self, hostname: str) -> str: + """Get a host's FQDN with its hostname. + + If the FQDN can't be resolved, the address from the inventory will + be returned instead. + """ + addr = self.mgr.inventory.get_addr(hostname) + return socket.getfqdn(addr) def _set_service_url_on_dashboard(self, service_name: str, @@ -442,9 +459,10 @@ class CephService(CephadmService): # the CephService class refers to service types, not daemon types if self.TYPE in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']: return AuthEntity(f'client.{self.TYPE}.{daemon_id}') - elif self.TYPE == 'crash': + elif self.TYPE in ['crash', 'agent']: if host == "": - raise OrchestratorError("Host not provided to generate auth entity name") + raise OrchestratorError( + f'Host not provided to generate <{self.TYPE}> auth entity name') return AuthEntity(f'client.{self.TYPE}.{host}') elif self.TYPE == 'mon': return AuthEntity('mon.') @@ -655,19 +673,31 @@ class MgrService(CephService): return DaemonDescription() def fail_over(self) -> None: - if not self.mgr_map_has_standby(): - raise OrchestratorError('Need standby mgr daemon', event_kind_subject=( - 'daemon', 'mgr' + self.mgr.get_mgr_id())) - - self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(), - 'INFO', 'Failing over to other MGR') - logger.info('Failing over to other MGR') - - # fail over - ret, out, err = self.mgr.check_mon_command({ - 'prefix': 'mgr fail', - 'who': self.mgr.get_mgr_id(), - }) + # this has been seen to sometimes transiently fail even when there are multiple + # mgr daemons. As long as there are multiple known mgr daemons, we should retry. + class NoStandbyError(OrchestratorError): + pass + no_standby_exc = NoStandbyError('Need standby mgr daemon', event_kind_subject=( + 'daemon', 'mgr' + self.mgr.get_mgr_id())) + for sleep_secs in [2, 8, 15]: + try: + if not self.mgr_map_has_standby(): + raise no_standby_exc + self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(), + 'INFO', 'Failing over to other MGR') + logger.info('Failing over to other MGR') + + # fail over + ret, out, err = self.mgr.check_mon_command({ + 'prefix': 'mgr fail', + 'who': self.mgr.get_mgr_id(), + }) + return + except NoStandbyError: + logger.info( + f'Failed to find standby mgr for failover. Retrying in {sleep_secs} seconds') + time.sleep(sleep_secs) + raise no_standby_exc def mgr_map_has_standby(self) -> bool: """ @@ -985,6 +1015,18 @@ class CrashService(CephService): class CephfsMirrorService(CephService): TYPE = 'cephfs-mirror' + def config(self, spec: ServiceSpec) -> None: + # make sure mirroring module is enabled + mgr_map = self.mgr.get('mgr_map') + mod_name = 'mirroring' + if mod_name not in mgr_map.get('services', {}): + self.mgr.check_mon_command({ + 'prefix': 'mgr module enable', + 'module': mod_name + }) + # we shouldn't get here (mon will tell the mgr to respawn), but no + # harm done if we do. + def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: assert self.TYPE == daemon_spec.daemon_type @@ -1000,3 +1042,52 @@ class CephfsMirrorService(CephService): daemon_spec.keyring = keyring daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) return daemon_spec + + +class CephadmAgent(CephService): + TYPE = 'agent' + + def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: + assert self.TYPE == daemon_spec.daemon_type + daemon_id, host = daemon_spec.daemon_id, daemon_spec.host + + if not self.mgr.cherrypy_thread: + raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint') + + keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), []) + daemon_spec.keyring = keyring + self.mgr.agent_cache.agent_keys[host] = keyring + + daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) + + return daemon_spec + + def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]: + try: + assert self.mgr.cherrypy_thread + assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert() + assert self.mgr.cherrypy_thread.server_port + except Exception: + raise OrchestratorError( + 'Cannot deploy agent daemons until cephadm endpoint has finished generating certs') + + cfg = {'target_ip': self.mgr.get_mgr_ip(), + 'target_port': self.mgr.cherrypy_thread.server_port, + 'refresh_period': self.mgr.agent_refresh_rate, + 'listener_port': self.mgr.agent_starting_port, + 'host': daemon_spec.host, + 'device_enhanced_scan': str(self.mgr.device_enhanced_scan)} + + listener_cert, listener_key = self.mgr.cherrypy_thread.ssl_certs.generate_cert( + self.mgr.inventory.get_addr(daemon_spec.host)) + config = { + 'agent.json': json.dumps(cfg), + 'keyring': daemon_spec.keyring, + 'root_cert.pem': self.mgr.cherrypy_thread.ssl_certs.get_root_cert(), + 'listener.crt': listener_cert, + 'listener.key': listener_key, + } + + return config, sorted([str(self.mgr.get_mgr_ip()), str(self.mgr.cherrypy_thread.server_port), + self.mgr.cherrypy_thread.ssl_certs.get_root_cert(), + str(self.mgr.get_module_option('device_enhanced_scan'))])