import os
from collections import defaultdict
from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Set, \
- DefaultDict
+ DefaultDict, Callable
from ceph.deployment import inventory
from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.service_spec import ServiceSpec, CustomContainerSpec, PlacementSpec, RGWSpec
+from ceph.deployment.service_spec import (
+ ArgumentList,
+ ArgumentSpec,
+ CustomContainerSpec,
+ PlacementSpec,
+ RGWSpec,
+ ServiceSpec,
+ IngressSpec,
+)
from ceph.utils import datetime_now
import orchestrator
from cephadm.schedule import HostAssignment
from cephadm.autotune import MemoryAutotuner
from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \
- CephadmNoImage, CEPH_TYPES, ContainerInspectInfo
+ CephadmNoImage, CEPH_TYPES, ContainerInspectInfo, SpecialHostLabels
from mgr_module import MonCommandFailed
from mgr_util import format_bytes, verify_tls, get_cert_issuer_info, ServerConfigException
from . import utils
+from . import exchange
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
if (
not self.mgr.use_agent
- or host not in [h.hostname for h in self.mgr.cache.get_non_draining_hosts()]
+ or self.mgr.cache.is_host_draining(host)
or host in agents_down
):
if self.mgr.cache.host_needs_daemon_refresh(host):
if (
self.mgr.cache.host_needs_autotune_memory(host)
- and not self.mgr.inventory.has_label(host, '_no_autotune_memory')
+ and not self.mgr.inventory.has_label(host, SpecialHostLabels.NO_MEMORY_AUTOTUNE)
):
self.log.debug(f"autotuning memory for {host}")
self._autotune_host_memory(host)
def _refresh_host_devices(self, host: str) -> Optional[str]:
with_lsm = self.mgr.device_enhanced_scan
+ list_all = self.mgr.inventory_list_all
inventory_args = ['--', 'inventory',
'--format=json-pretty',
'--filter-for-batch']
if with_lsm:
inventory_args.insert(-1, "--with-lsm")
+ if list_all:
+ inventory_args.insert(-1, "--list-all")
try:
try:
public_networks = [x.strip() for x in out.split(',')]
self.log.debug('mon public_network(s) is %s' % public_networks)
- def matches_network(host):
- # type: (str) -> bool
+ def matches_public_network(host: str, sspec: ServiceSpec) -> bool:
# make sure the host has at least one network that belongs to some configured public network(s)
for pn in public_networks:
public_network = ipaddress.ip_network(pn)
)
return False
+ def has_interface_for_vip(host: str, sspec: ServiceSpec) -> bool:
+ # make sure the host has an interface that can
+ # actually accomodate the VIP
+ if not sspec or sspec.service_type != 'ingress':
+ return True
+ ingress_spec = cast(IngressSpec, sspec)
+ virtual_ips = []
+ if ingress_spec.virtual_ip:
+ virtual_ips.append(ingress_spec.virtual_ip)
+ elif ingress_spec.virtual_ips_list:
+ virtual_ips = ingress_spec.virtual_ips_list
+ for vip in virtual_ips:
+ found = False
+ bare_ip = str(vip).split('/')[0]
+ for subnet, ifaces in self.mgr.cache.networks.get(host, {}).items():
+ if ifaces and ipaddress.ip_address(bare_ip) in ipaddress.ip_network(subnet):
+ # found matching interface for this IP, move on
+ self.log.debug(
+ f'{bare_ip} is in {subnet} on {host} interface {list(ifaces.keys())[0]}'
+ )
+ found = True
+ break
+ if not found:
+ self.log.info(
+ f"Filtered out host {host}: Host has no interface available for VIP: {vip}"
+ )
+ return False
+ return True
+
+ host_filters: Dict[str, Callable[[str, ServiceSpec], bool]] = {
+ 'mon': matches_public_network,
+ 'ingress': has_interface_for_vip
+ }
+
rank_map = None
if svc.ranked():
rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {}
daemons=daemons,
related_service_daemons=related_service_daemons,
networks=self.mgr.cache.networks,
- filter_new_host=(
- matches_network if service_type == 'mon'
- else None
- ),
+ filter_new_host=host_filters.get(service_type, None),
allow_colo=svc.allow_colo(),
primary_daemon_type=svc.primary_daemon_type(spec),
per_host_daemon_type=svc.per_host_daemon_type(spec),
while daemons_to_remove and not _ok_to_stop(daemons_to_remove):
# let's find a subset that is ok-to-stop
- daemons_to_remove.pop()
+ non_error_daemon_index = -1
+ # prioritize removing daemons in error state
+ for i, dmon in enumerate(daemons_to_remove):
+ if dmon.status != DaemonDescriptionStatus.error:
+ non_error_daemon_index = i
+ break
+ if non_error_daemon_index != -1:
+ daemons_to_remove.pop(non_error_daemon_index)
+ else:
+ # all daemons in list are in error state
+ # we should be able to remove all of them
+ break
for d in daemons_to_remove:
r = True
assert d.hostname is not None
self.mgr.spec_store.mark_configured(spec.service_name())
if self.mgr.use_agent:
# can only send ack to agents if we know for sure port they bound to
- hosts_altered = set([h for h in hosts_altered if (h in self.mgr.agent_cache.agent_ports and h in [
- h2.hostname for h2 in self.mgr.cache.get_non_draining_hosts()])])
+ hosts_altered = set([h for h in hosts_altered if (h in self.mgr.agent_cache.agent_ports and not self.mgr.cache.is_host_draining(h))])
self.mgr.agent_helpers._request_agent_acks(hosts_altered, increment=True)
if r is None:
pspec = PlacementSpec.from_string(self.mgr.manage_etc_ceph_ceph_conf_hosts)
ha = HostAssignment(
spec=ServiceSpec('mon', placement=pspec),
- hosts=self.mgr.cache.get_schedulable_hosts(),
+ hosts=self.mgr.cache.get_conf_keyring_available_hosts(),
unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
- draining_hosts=self.mgr.cache.get_draining_hosts(),
+ draining_hosts=self.mgr.cache.get_conf_keyring_draining_hosts(),
daemons=[],
networks=self.mgr.cache.networks,
)
keyring.encode('utf-8')).digest())
ha = HostAssignment(
spec=ServiceSpec('mon', placement=ks.placement),
- hosts=self.mgr.cache.get_schedulable_hosts(),
+ hosts=self.mgr.cache.get_conf_keyring_available_hosts(),
unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
- draining_hosts=self.mgr.cache.get_draining_hosts(),
+ draining_hosts=self.mgr.cache.get_conf_keyring_draining_hosts(),
daemons=[],
networks=self.mgr.cache.networks,
)
client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]],
host: str) -> None:
updated_files = False
- if host in self.mgr.offline_hosts:
+ if self.mgr.cache.is_host_unreachable(host):
return
old_files = self.mgr.cache.get_host_client_files(host).copy()
for path, m in client_files.get(host, {}).items():
osd_uuid_map: Optional[Dict[str, Any]] = None,
) -> str:
+ daemon_params: Dict[str, Any] = {}
with set_exception_subject('service', orchestrator.DaemonDescription(
daemon_type=daemon_spec.daemon_type,
daemon_id=daemon_spec.daemon_id,
image = ''
start_time = datetime_now()
ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
+ port_ips: Dict[str, str] = daemon_spec.port_ips if daemon_spec.port_ips else {}
if daemon_spec.daemon_type == 'container':
spec = cast(CustomContainerSpec,
# TCP port to open in the host firewall
if len(ports) > 0:
- daemon_spec.extra_args.extend([
- '--tcp-ports', ' '.join(map(str, ports))
- ])
+ daemon_params['tcp_ports'] = list(ports)
+
+ if port_ips:
+ daemon_params['port_ips'] = port_ips
# osd deployments needs an --osd-uuid arg
if daemon_spec.daemon_type == 'osd':
osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
if not osd_uuid:
raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
- daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
+ daemon_params['osd_fsid'] = osd_uuid
if reconfig:
- daemon_spec.extra_args.append('--reconfig')
+ daemon_params['reconfig'] = True
if self.mgr.allow_ptrace:
- daemon_spec.extra_args.append('--allow-ptrace')
+ daemon_params['allow_ptrace'] = True
- daemon_spec, extra_container_args, extra_entrypoint_args = self._setup_extra_deployment_args(daemon_spec)
+ daemon_spec, extra_container_args, extra_entrypoint_args = self._setup_extra_deployment_args(daemon_spec, daemon_params)
if daemon_spec.service_name in self.mgr.spec_store:
configs = self.mgr.spec_store[daemon_spec.service_name].spec.custom_configs
daemon_spec.name(), daemon_spec.host))
out, err, code = await self._run_cephadm(
- daemon_spec.host, daemon_spec.name(), 'deploy',
- [
- '--name', daemon_spec.name(),
- '--meta-json', json.dumps({
- 'service_name': daemon_spec.service_name,
- 'ports': daemon_spec.ports,
- 'ip': daemon_spec.ip,
- 'deployed_by': self.mgr.get_active_mgr_digests(),
- 'rank': daemon_spec.rank,
- 'rank_generation': daemon_spec.rank_generation,
- 'extra_container_args': extra_container_args,
- 'extra_entrypoint_args': extra_entrypoint_args
- }),
- '--config-json', '-',
- ] + daemon_spec.extra_args,
- stdin=json.dumps(daemon_spec.final_config),
- image=image,
+ daemon_spec.host,
+ daemon_spec.name(),
+ ['_orch', 'deploy'],
+ [],
+ stdin=exchange.Deploy(
+ fsid=self.mgr._cluster_fsid,
+ name=daemon_spec.name(),
+ image=image,
+ params=daemon_params,
+ meta=exchange.DeployMeta(
+ service_name=daemon_spec.service_name,
+ ports=daemon_spec.ports,
+ ip=daemon_spec.ip,
+ deployed_by=self.mgr.get_active_mgr_digests(),
+ rank=daemon_spec.rank,
+ rank_generation=daemon_spec.rank_generation,
+ extra_container_args=ArgumentSpec.map_json(
+ extra_container_args,
+ ),
+ extra_entrypoint_args=ArgumentSpec.map_json(
+ extra_entrypoint_args,
+ ),
+ ),
+ config_blobs=daemon_spec.final_config,
+ ).dump_json_str(),
)
if daemon_spec.daemon_type == 'agent':
self.mgr.cephadm_services[servict_type].post_remove(dd, is_failed_deploy=True)
raise
- def _setup_extra_deployment_args(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[CephadmDaemonDeploySpec, Optional[List[str]], Optional[List[str]]]:
+ def _setup_extra_deployment_args(
+ self,
+ daemon_spec: CephadmDaemonDeploySpec,
+ params: Dict[str, Any],
+ ) -> Tuple[CephadmDaemonDeploySpec, Optional[ArgumentList], Optional[ArgumentList]]:
# this function is for handling any potential user specified
# (in the service spec) extra runtime or entrypoint args for a daemon
# we are going to deploy. Effectively just adds a set of extra args to
# pass to the cephadm binary to indicate the daemon being deployed
# needs extra runtime/entrypoint args. Returns the modified daemon spec
# as well as what args were added (as those are included in unit.meta file)
+ def _to_args(lst: ArgumentList) -> List[str]:
+ out: List[str] = []
+ for argspec in lst:
+ out.extend(argspec.to_args())
+ return out
+
try:
eca = daemon_spec.extra_container_args
if eca:
- for a in eca:
- # args with spaces need to be split into multiple args
- # in order to work properly
- args = a.split(' ')
- for arg in args:
- if arg:
- daemon_spec.extra_args.append(f'--extra-container-args={arg}')
+ params['extra_container_args'] = _to_args(eca)
except AttributeError:
eca = None
try:
eea = daemon_spec.extra_entrypoint_args
if eea:
- for a in eea:
- # args with spaces need to be split into multiple args
- # in order to work properly
- args = a.split(' ')
- for arg in args:
- if arg:
- daemon_spec.extra_args.append(f'--extra-entrypoint-args={arg}')
+ params['extra_entrypoint_args'] = _to_args(eea)
except AttributeError:
eea = None
return daemon_spec, eca, eea
async def _run_cephadm(self,
host: str,
entity: Union[CephadmNoImage, str],
- command: str,
+ command: Union[str, List[str]],
args: List[str],
addr: Optional[str] = "",
stdin: Optional[str] = "",
final_args += ['--timeout', str(timeout)]
# subcommand
- final_args.append(command)
+ if isinstance(command, list):
+ final_args.extend([str(v) for v in command])
+ else:
+ final_args.append(command)
# subcommand args
if not no_fsid: