]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/cephadm/serve.py
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / pybind / mgr / cephadm / serve.py
index 877a00cf714b94f9178701772f4ff6cc45e64192..5dfdc27a3ff50377dcfadfdcba1778f63307e5da 100644 (file)
@@ -6,11 +6,19 @@ import uuid
 import os
 from collections import defaultdict
 from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Set, \
-    DefaultDict
+    DefaultDict, Callable
 
 from ceph.deployment import inventory
 from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.service_spec import ServiceSpec, CustomContainerSpec, PlacementSpec, RGWSpec
+from ceph.deployment.service_spec import (
+    ArgumentList,
+    ArgumentSpec,
+    CustomContainerSpec,
+    PlacementSpec,
+    RGWSpec,
+    ServiceSpec,
+    IngressSpec,
+)
 from ceph.utils import datetime_now
 
 import orchestrator
@@ -20,11 +28,12 @@ from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
 from cephadm.schedule import HostAssignment
 from cephadm.autotune import MemoryAutotuner
 from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \
-    CephadmNoImage, CEPH_TYPES, ContainerInspectInfo
+    CephadmNoImage, CEPH_TYPES, ContainerInspectInfo, SpecialHostLabels
 from mgr_module import MonCommandFailed
 from mgr_util import format_bytes, verify_tls, get_cert_issuer_info, ServerConfigException
 
 from . import utils
+from . import exchange
 
 if TYPE_CHECKING:
     from cephadm.module import CephadmOrchestrator
@@ -244,7 +253,7 @@ class CephadmServe:
 
             if (
                 not self.mgr.use_agent
-                or host not in [h.hostname for h in self.mgr.cache.get_non_draining_hosts()]
+                or self.mgr.cache.is_host_draining(host)
                 or host in agents_down
             ):
                 if self.mgr.cache.host_needs_daemon_refresh(host):
@@ -295,7 +304,7 @@ class CephadmServe:
 
             if (
                     self.mgr.cache.host_needs_autotune_memory(host)
-                    and not self.mgr.inventory.has_label(host, '_no_autotune_memory')
+                    and not self.mgr.inventory.has_label(host, SpecialHostLabels.NO_MEMORY_AUTOTUNE)
             ):
                 self.log.debug(f"autotuning memory for {host}")
                 self._autotune_host_memory(host)
@@ -370,11 +379,14 @@ class CephadmServe:
 
     def _refresh_host_devices(self, host: str) -> Optional[str]:
         with_lsm = self.mgr.device_enhanced_scan
+        list_all = self.mgr.inventory_list_all
         inventory_args = ['--', 'inventory',
                           '--format=json-pretty',
                           '--filter-for-batch']
         if with_lsm:
             inventory_args.insert(-1, "--with-lsm")
+        if list_all:
+            inventory_args.insert(-1, "--list-all")
 
         try:
             try:
@@ -687,8 +699,7 @@ class CephadmServe:
                 public_networks = [x.strip() for x in out.split(',')]
                 self.log.debug('mon public_network(s) is %s' % public_networks)
 
-        def matches_network(host):
-            # type: (str) -> bool
+        def matches_public_network(host: str, sspec: ServiceSpec) -> bool:
             # make sure the host has at least one network that belongs to some configured public network(s)
             for pn in public_networks:
                 public_network = ipaddress.ip_network(pn)
@@ -705,6 +716,40 @@ class CephadmServe:
             )
             return False
 
+        def has_interface_for_vip(host: str, sspec: ServiceSpec) -> bool:
+            # make sure the host has an interface that can
+            # actually accomodate the VIP
+            if not sspec or sspec.service_type != 'ingress':
+                return True
+            ingress_spec = cast(IngressSpec, sspec)
+            virtual_ips = []
+            if ingress_spec.virtual_ip:
+                virtual_ips.append(ingress_spec.virtual_ip)
+            elif ingress_spec.virtual_ips_list:
+                virtual_ips = ingress_spec.virtual_ips_list
+            for vip in virtual_ips:
+                found = False
+                bare_ip = str(vip).split('/')[0]
+                for subnet, ifaces in self.mgr.cache.networks.get(host, {}).items():
+                    if ifaces and ipaddress.ip_address(bare_ip) in ipaddress.ip_network(subnet):
+                        # found matching interface for this IP, move on
+                        self.log.debug(
+                            f'{bare_ip} is in {subnet} on {host} interface {list(ifaces.keys())[0]}'
+                        )
+                        found = True
+                        break
+                if not found:
+                    self.log.info(
+                        f"Filtered out host {host}: Host has no interface available for VIP: {vip}"
+                    )
+                    return False
+            return True
+
+        host_filters: Dict[str, Callable[[str, ServiceSpec], bool]] = {
+            'mon': matches_public_network,
+            'ingress': has_interface_for_vip
+        }
+
         rank_map = None
         if svc.ranked():
             rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {}
@@ -717,10 +762,7 @@ class CephadmServe:
             daemons=daemons,
             related_service_daemons=related_service_daemons,
             networks=self.mgr.cache.networks,
-            filter_new_host=(
-                matches_network if service_type == 'mon'
-                else None
-            ),
+            filter_new_host=host_filters.get(service_type, None),
             allow_colo=svc.allow_colo(),
             primary_daemon_type=svc.primary_daemon_type(spec),
             per_host_daemon_type=svc.per_host_daemon_type(spec),
@@ -912,7 +954,18 @@ class CephadmServe:
 
             while daemons_to_remove and not _ok_to_stop(daemons_to_remove):
                 # let's find a subset that is ok-to-stop
-                daemons_to_remove.pop()
+                non_error_daemon_index = -1
+                # prioritize removing daemons in error state
+                for i, dmon in enumerate(daemons_to_remove):
+                    if dmon.status != DaemonDescriptionStatus.error:
+                        non_error_daemon_index = i
+                        break
+                if non_error_daemon_index != -1:
+                    daemons_to_remove.pop(non_error_daemon_index)
+                else:
+                    # all daemons in list are in error state
+                    # we should be able to remove all of them
+                    break
             for d in daemons_to_remove:
                 r = True
                 assert d.hostname is not None
@@ -933,8 +986,7 @@ class CephadmServe:
                 self.mgr.spec_store.mark_configured(spec.service_name())
             if self.mgr.use_agent:
                 # can only send ack to agents if we know for sure port they bound to
-                hosts_altered = set([h for h in hosts_altered if (h in self.mgr.agent_cache.agent_ports and h in [
-                                    h2.hostname for h2 in self.mgr.cache.get_non_draining_hosts()])])
+                hosts_altered = set([h for h in hosts_altered if (h in self.mgr.agent_cache.agent_ports and not self.mgr.cache.is_host_draining(h))])
                 self.mgr.agent_helpers._request_agent_acks(hosts_altered, increment=True)
 
         if r is None:
@@ -1115,9 +1167,9 @@ class CephadmServe:
                 pspec = PlacementSpec.from_string(self.mgr.manage_etc_ceph_ceph_conf_hosts)
                 ha = HostAssignment(
                     spec=ServiceSpec('mon', placement=pspec),
-                    hosts=self.mgr.cache.get_schedulable_hosts(),
+                    hosts=self.mgr.cache.get_conf_keyring_available_hosts(),
                     unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
-                    draining_hosts=self.mgr.cache.get_draining_hosts(),
+                    draining_hosts=self.mgr.cache.get_conf_keyring_draining_hosts(),
                     daemons=[],
                     networks=self.mgr.cache.networks,
                 )
@@ -1146,9 +1198,9 @@ class CephadmServe:
                     keyring.encode('utf-8')).digest())
                 ha = HostAssignment(
                     spec=ServiceSpec('mon', placement=ks.placement),
-                    hosts=self.mgr.cache.get_schedulable_hosts(),
+                    hosts=self.mgr.cache.get_conf_keyring_available_hosts(),
                     unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
-                    draining_hosts=self.mgr.cache.get_draining_hosts(),
+                    draining_hosts=self.mgr.cache.get_conf_keyring_draining_hosts(),
                     daemons=[],
                     networks=self.mgr.cache.networks,
                 )
@@ -1183,7 +1235,7 @@ class CephadmServe:
                             client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]],
                             host: str) -> None:
         updated_files = False
-        if host in self.mgr.offline_hosts:
+        if self.mgr.cache.is_host_unreachable(host):
             return
         old_files = self.mgr.cache.get_host_client_files(host).copy()
         for path, m in client_files.get(host, {}).items():
@@ -1214,6 +1266,7 @@ class CephadmServe:
                              osd_uuid_map: Optional[Dict[str, Any]] = None,
                              ) -> str:
 
+        daemon_params: Dict[str, Any] = {}
         with set_exception_subject('service', orchestrator.DaemonDescription(
                 daemon_type=daemon_spec.daemon_type,
                 daemon_id=daemon_spec.daemon_id,
@@ -1224,6 +1277,7 @@ class CephadmServe:
                 image = ''
                 start_time = datetime_now()
                 ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
+                port_ips: Dict[str, str] = daemon_spec.port_ips if daemon_spec.port_ips else {}
 
                 if daemon_spec.daemon_type == 'container':
                     spec = cast(CustomContainerSpec,
@@ -1234,9 +1288,10 @@ class CephadmServe:
 
                 # TCP port to open in the host firewall
                 if len(ports) > 0:
-                    daemon_spec.extra_args.extend([
-                        '--tcp-ports', ' '.join(map(str, ports))
-                    ])
+                    daemon_params['tcp_ports'] = list(ports)
+
+                if port_ips:
+                    daemon_params['port_ips'] = port_ips
 
                 # osd deployments needs an --osd-uuid arg
                 if daemon_spec.daemon_type == 'osd':
@@ -1245,14 +1300,14 @@ class CephadmServe:
                     osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
                     if not osd_uuid:
                         raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
-                    daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
+                    daemon_params['osd_fsid'] = osd_uuid
 
                 if reconfig:
-                    daemon_spec.extra_args.append('--reconfig')
+                    daemon_params['reconfig'] = True
                 if self.mgr.allow_ptrace:
-                    daemon_spec.extra_args.append('--allow-ptrace')
+                    daemon_params['allow_ptrace'] = True
 
-                daemon_spec, extra_container_args, extra_entrypoint_args = self._setup_extra_deployment_args(daemon_spec)
+                daemon_spec, extra_container_args, extra_entrypoint_args = self._setup_extra_deployment_args(daemon_spec, daemon_params)
 
                 if daemon_spec.service_name in self.mgr.spec_store:
                     configs = self.mgr.spec_store[daemon_spec.service_name].spec.custom_configs
@@ -1268,23 +1323,31 @@ class CephadmServe:
                     daemon_spec.name(), daemon_spec.host))
 
                 out, err, code = await self._run_cephadm(
-                    daemon_spec.host, daemon_spec.name(), 'deploy',
-                    [
-                        '--name', daemon_spec.name(),
-                        '--meta-json', json.dumps({
-                            'service_name': daemon_spec.service_name,
-                            'ports': daemon_spec.ports,
-                            'ip': daemon_spec.ip,
-                            'deployed_by': self.mgr.get_active_mgr_digests(),
-                            'rank': daemon_spec.rank,
-                            'rank_generation': daemon_spec.rank_generation,
-                            'extra_container_args': extra_container_args,
-                            'extra_entrypoint_args': extra_entrypoint_args
-                        }),
-                        '--config-json', '-',
-                    ] + daemon_spec.extra_args,
-                    stdin=json.dumps(daemon_spec.final_config),
-                    image=image,
+                    daemon_spec.host,
+                    daemon_spec.name(),
+                    ['_orch', 'deploy'],
+                    [],
+                    stdin=exchange.Deploy(
+                        fsid=self.mgr._cluster_fsid,
+                        name=daemon_spec.name(),
+                        image=image,
+                        params=daemon_params,
+                        meta=exchange.DeployMeta(
+                            service_name=daemon_spec.service_name,
+                            ports=daemon_spec.ports,
+                            ip=daemon_spec.ip,
+                            deployed_by=self.mgr.get_active_mgr_digests(),
+                            rank=daemon_spec.rank,
+                            rank_generation=daemon_spec.rank_generation,
+                            extra_container_args=ArgumentSpec.map_json(
+                                extra_container_args,
+                            ),
+                            extra_entrypoint_args=ArgumentSpec.map_json(
+                                extra_entrypoint_args,
+                            ),
+                        ),
+                        config_blobs=daemon_spec.final_config,
+                    ).dump_json_str(),
                 )
 
                 if daemon_spec.daemon_type == 'agent':
@@ -1329,35 +1392,33 @@ class CephadmServe:
                     self.mgr.cephadm_services[servict_type].post_remove(dd, is_failed_deploy=True)
                 raise
 
-    def _setup_extra_deployment_args(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[CephadmDaemonDeploySpec, Optional[List[str]], Optional[List[str]]]:
+    def _setup_extra_deployment_args(
+        self,
+        daemon_spec: CephadmDaemonDeploySpec,
+        params: Dict[str, Any],
+    ) -> Tuple[CephadmDaemonDeploySpec, Optional[ArgumentList], Optional[ArgumentList]]:
         # this function is for handling any potential user specified
         # (in the service spec) extra runtime or entrypoint args for a daemon
         # we are going to deploy. Effectively just adds a set of extra args to
         # pass to the cephadm binary to indicate the daemon being deployed
         # needs extra runtime/entrypoint args. Returns the modified daemon spec
         # as well as what args were added (as those are included in unit.meta file)
+        def _to_args(lst: ArgumentList) -> List[str]:
+            out: List[str] = []
+            for argspec in lst:
+                out.extend(argspec.to_args())
+            return out
+
         try:
             eca = daemon_spec.extra_container_args
             if eca:
-                for a in eca:
-                    # args with spaces need to be split into multiple args
-                    # in order to work properly
-                    args = a.split(' ')
-                    for arg in args:
-                        if arg:
-                            daemon_spec.extra_args.append(f'--extra-container-args={arg}')
+                params['extra_container_args'] = _to_args(eca)
         except AttributeError:
             eca = None
         try:
             eea = daemon_spec.extra_entrypoint_args
             if eea:
-                for a in eea:
-                    # args with spaces need to be split into multiple args
-                    # in order to work properly
-                    args = a.split(' ')
-                    for arg in args:
-                        if arg:
-                            daemon_spec.extra_args.append(f'--extra-entrypoint-args={arg}')
+                params['extra_entrypoint_args'] = _to_args(eea)
         except AttributeError:
             eea = None
         return daemon_spec, eca, eea
@@ -1431,7 +1492,7 @@ class CephadmServe:
     async def _run_cephadm(self,
                            host: str,
                            entity: Union[CephadmNoImage, str],
-                           command: str,
+                           command: Union[str, List[str]],
                            args: List[str],
                            addr: Optional[str] = "",
                            stdin: Optional[str] = "",
@@ -1496,7 +1557,10 @@ class CephadmServe:
         final_args += ['--timeout', str(timeout)]
 
         # subcommand
-        final_args.append(command)
+        if isinstance(command, list):
+            final_args.extend([str(v) for v in command])
+        else:
+            final_args.append(command)
 
         # subcommand args
         if not no_fsid: