from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types
+from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
from .utils import resolve_ip
from .migrations import queue_migrate_nfs_spec
HOST_CACHE_PREFIX = "host."
SPEC_STORE_PREFIX = "spec."
+AGENT_CACHE_PREFIX = 'agent.'
class Inventory:
self.assert_host(host)
return self._inventory[host].get('addr', host)
- def filter_by_label(self, label: Optional[str] = '', as_hostspec: bool = False) -> Iterator:
- for h, hostspec in self._inventory.items():
- if not label or label in hostspec.get('labels', []):
- if as_hostspec:
- yield self.spec_from_dict(hostspec)
- else:
- yield h
-
def spec_from_dict(self, info: dict) -> HostSpec:
hostname = info['hostname']
return HostSpec(
self.osdspec_previews = {} # type: Dict[str, List[Dict[str, Any]]]
self.osdspec_last_applied = {} # type: Dict[str, Dict[str, datetime.datetime]]
self.networks = {} # type: Dict[str, Dict[str, Dict[str, List[str]]]]
+ self.last_network_update = {} # type: Dict[str, datetime.datetime]
self.last_device_update = {} # type: Dict[str, datetime.datetime]
self.last_device_change = {} # type: Dict[str, datetime.datetime]
self.daemon_refresh_queue = [] # type: List[str]
self.device_refresh_queue = [] # type: List[str]
+ self.network_refresh_queue = [] # type: List[str]
self.osdspec_previews_refresh_queue = [] # type: List[str]
# host -> daemon name -> dict
self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {}
+ self.metadata_up_to_date = {} # type: Dict[str, bool]
+
def load(self):
# type: () -> None
for k, v in self.mgr.get_store_prefix(HOST_CACHE_PREFIX).items():
# for services, we ignore the persisted last_*_update
# and always trigger a new scrape on mgr restart.
self.daemon_refresh_queue.append(host)
+ self.network_refresh_queue.append(host)
self.daemons[host] = {}
self.osdspec_previews[host] = []
self.osdspec_last_applied[host] = {}
self.last_host_check[host] = str_to_datetime(j['last_host_check'])
self.registry_login_queue.add(host)
self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {})
+ self.metadata_up_to_date[host] = j.get('metadata_up_to_date', False)
self.mgr.log.debug(
'HostCache.load: host %s has %d daemons, '
return True
return False
- def update_host_devices_networks(
+ def update_host_devices(
self,
host: str,
dls: List[inventory.Device],
- nets: Dict[str, Dict[str, List[str]]]
) -> None:
if (
host not in self.devices
self.last_device_change[host] = datetime_now()
self.last_device_update[host] = datetime_now()
self.devices[host] = dls
+
+ def update_host_networks(
+ self,
+ host: str,
+ nets: Dict[str, Dict[str, List[str]]]
+ ) -> None:
self.networks[host] = nets
+ self.last_network_update[host] = datetime_now()
def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None:
self.daemon_config_deps[host][name] = {
self.daemon_config_deps[host] = {}
self.daemon_refresh_queue.append(host)
self.device_refresh_queue.append(host)
+ self.network_refresh_queue.append(host)
self.osdspec_previews_refresh_queue.append(host)
self.registry_login_queue.add(host)
self.last_client_files[host] = {}
del self.last_device_update[host]
self.mgr.event.set()
+ def invalidate_host_networks(self, host):
+ # type: (str) -> None
+ self.network_refresh_queue.append(host)
+ if host in self.last_network_update:
+ del self.last_network_update[host]
+ self.mgr.event.set()
+
def distribute_new_registry_login_info(self) -> None:
self.registry_login_queue = set(self.mgr.inventory.keys())
j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host])
if host in self.last_device_update:
j['last_device_update'] = datetime_to_str(self.last_device_update[host])
+ if host in self.last_network_update:
+ j['last_network_update'] = datetime_to_str(self.last_network_update[host])
if host in self.last_device_change:
j['last_device_change'] = datetime_to_str(self.last_device_change[host])
if host in self.daemons:
j['last_client_files'] = self.last_client_files[host]
if host in self.scheduled_daemon_actions:
j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
+ if host in self.metadata_up_to_date:
+ j['metadata_up_to_date'] = self.metadata_up_to_date[host]
self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
del self.last_daemon_update[host]
if host in self.last_device_update:
del self.last_device_update[host]
+ if host in self.last_network_update:
+ del self.last_network_update[host]
if host in self.last_device_change:
del self.last_device_change[host]
if host in self.daemon_config_deps:
def get_hosts(self):
# type: () -> List[str]
- r = []
- for host, di in self.daemons.items():
- r.append(host)
- return r
+ return list(self.daemons)
+
+ def get_schedulable_hosts(self) -> List[HostSpec]:
+ """
+ Returns all usable hosts that went through _refresh_host_daemons().
+
+ This mitigates a potential race, where new host was added *after*
+ ``_refresh_host_daemons()`` was called, but *before*
+ ``_apply_all_specs()`` was called. thus we end up with a hosts
+ where daemons might be running, but we have not yet detected them.
+ """
+ return [
+ h for h in self.mgr.inventory.all_specs()
+ if (
+ self.host_had_daemon_refresh(h.hostname)
+ and '_no_schedule' not in h.labels
+ )
+ ]
+
+ def get_non_draining_hosts(self) -> List[HostSpec]:
+ """
+ Returns all hosts that do not have _no_schedule label.
+
+ Useful for the agent who needs this specific list rather than the
+ schedulable_hosts since the agent needs to be deployed on hosts with
+ no daemon refresh
+ """
+ return [
+ h for h in self.mgr.inventory.all_specs() if '_no_schedule' not in h.labels
+ ]
+
+ def get_unreachable_hosts(self) -> List[HostSpec]:
+ """
+ Return all hosts that are offline or in maintenance mode.
+
+ The idea is we should not touch the daemons on these hosts (since
+ in theory the hosts are inaccessible so we CAN'T touch them) but
+ we still want to count daemons that exist on these hosts toward the
+ placement so daemons on these hosts aren't just moved elsewhere
+ """
+ return [
+ h for h in self.mgr.inventory.all_specs()
+ if (
+ h.status.lower() in ['maintenance', 'offline']
+ or h.hostname in self.mgr.offline_hosts
+ )
+ ]
def get_facts(self, host: str) -> Dict[str, Any]:
return self.facts.get(host, {})
+ def _get_daemons(self) -> Iterator[orchestrator.DaemonDescription]:
+ for dm in self.daemons.copy().values():
+ yield from dm.values()
+
def get_daemons(self):
# type: () -> List[orchestrator.DaemonDescription]
+ return list(self._get_daemons())
+
+ def get_error_daemons(self) -> List[orchestrator.DaemonDescription]:
r = []
- for host, dm in self.daemons.items():
- for name, dd in dm.items():
+ for dd in self._get_daemons():
+ if dd.status is not None and dd.status == orchestrator.DaemonDescriptionStatus.error:
r.append(dd)
return r
def get_daemons_by_host(self, host: str) -> List[orchestrator.DaemonDescription]:
return list(self.daemons.get(host, {}).values())
- def get_daemon(self, daemon_name: str) -> orchestrator.DaemonDescription:
+ def get_daemon(self, daemon_name: str, host: Optional[str] = None) -> orchestrator.DaemonDescription:
assert not daemon_name.startswith('ha-rgw.')
- for _, dm in self.daemons.items():
- for _, dd in dm.items():
- if dd.name() == daemon_name:
- return dd
+ dds = self.get_daemons_by_host(host) if host else self._get_daemons()
+ for dd in dds:
+ if dd.name() == daemon_name:
+ return dd
+
raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
- def has_daemon(self, daemon_name: str) -> bool:
+ def has_daemon(self, daemon_name: str, host: Optional[str] = None) -> bool:
try:
- self.get_daemon(daemon_name)
+ self.get_daemon(daemon_name, host)
except orchestrator.OrchestratorError:
return False
return True
# We do not refresh daemons on hosts in maintenance mode, so stored daemon statuses
# could be wrong. We must assume maintenance is working and daemons are stopped
dd.status = orchestrator.DaemonDescriptionStatus.stopped
- dd.status_desc = 'stopped'
dd.events = self.mgr.events.get_for_daemon(dd.name())
return dd
- for host, dm in self.daemons.items():
+ for host, dm in self.daemons.copy().items():
yield host, {name: alter(host, d) for name, d in dm.items()}
def get_daemons_by_service(self, service_name):
assert not service_name.startswith('keepalived.')
assert not service_name.startswith('haproxy.')
- result = [] # type: List[orchestrator.DaemonDescription]
- for host, dm in self.daemons.items():
- for name, d in dm.items():
- if d.service_name() == service_name:
- result.append(d)
- return result
+ return list(dd for dd in self._get_daemons() if dd.service_name() == service_name)
- def get_daemons_by_type(self, service_type):
- # type: (str) -> List[orchestrator.DaemonDescription]
+ def get_daemons_by_type(self, service_type: str, host: str = '') -> List[orchestrator.DaemonDescription]:
assert service_type not in ['keepalived', 'haproxy']
- result = [] # type: List[orchestrator.DaemonDescription]
- for host, dm in self.daemons.items():
- for name, d in dm.items():
- if d.daemon_type in service_to_daemon_types(service_type):
- result.append(d)
- return result
+ daemons = self.daemons[host].values() if host else self._get_daemons()
- def get_daemon_types(self, hostname: str) -> List[str]:
+ return [d for d in daemons if d.daemon_type in service_to_daemon_types(service_type)]
+
+ def get_daemon_types(self, hostname: str) -> Set[str]:
"""Provide a list of the types of daemons on the host"""
- result = set()
- for _d, dm in self.daemons[hostname].items():
- assert dm.daemon_type is not None, f'no daemon type for {dm!r}'
- result.add(dm.daemon_type)
- return list(result)
+ return cast(Set[str], {d.daemon_type for d in self.daemons[hostname].values()})
def get_daemon_names(self):
# type: () -> List[str]
- r = []
- for host, dm in self.daemons.items():
- for name, dd in dm.items():
- r.append(name)
- return r
+ return [d.name() for d in self._get_daemons()]
def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
if host in self.daemon_config_deps:
seconds=self.mgr.daemon_cache_timeout)
if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
return True
+ if not self.mgr.cache.host_metadata_up_to_date(host):
+ return True
return False
def host_needs_facts_refresh(self, host):
seconds=self.mgr.facts_cache_timeout)
if host not in self.last_facts_update or self.last_facts_update[host] < cutoff:
return True
+ if not self.mgr.cache.host_metadata_up_to_date(host):
+ return True
return False
def host_needs_autotune_memory(self, host):
seconds=self.mgr.device_cache_timeout)
if host not in self.last_device_update or self.last_device_update[host] < cutoff:
return True
+ if not self.mgr.cache.host_metadata_up_to_date(host):
+ return True
+ return False
+
+ def host_needs_network_refresh(self, host):
+ # type: (str) -> bool
+ if host in self.mgr.offline_hosts:
+ logger.debug(f'Host "{host}" marked as offline. Skipping network refresh')
+ return False
+ if host in self.network_refresh_queue:
+ self.network_refresh_queue.remove(host)
+ return True
+ cutoff = datetime_now() - datetime.timedelta(
+ seconds=self.mgr.device_cache_timeout)
+ if host not in self.last_network_update or self.last_network_update[host] < cutoff:
+ return True
+ if not self.mgr.cache.host_metadata_up_to_date(host):
+ return True
return False
def host_needs_osdspec_preview_refresh(self, host: str) -> bool:
return True
return False
+ def host_metadata_up_to_date(self, host: str) -> bool:
+ if host not in self.metadata_up_to_date or not self.metadata_up_to_date[host]:
+ return False
+ return True
+
+ def all_host_metadata_up_to_date(self) -> bool:
+ unreachables = [h.hostname for h in self.get_unreachable_hosts()]
+ if [h for h in self.get_hosts() if (not self.host_metadata_up_to_date(h) and h not in unreachables)]:
+ # this function is primarily for telling if it's safe to try and apply a service
+ # spec. Since offline/maintenance hosts aren't considered in that process anyway
+ # we don't want to return False if the host without up-to-date metadata is in one
+ # of those two categories.
+ return False
+ return True
+
def add_daemon(self, host, dd):
# type: (str, orchestrator.DaemonDescription) -> None
assert host in self.daemons
self.scheduled_daemon_actions[host] = {}
self.scheduled_daemon_actions[host][daemon_name] = action
- def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> None:
+ def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> bool:
+ found = False
if host in self.scheduled_daemon_actions:
if daemon_name in self.scheduled_daemon_actions[host]:
del self.scheduled_daemon_actions[host][daemon_name]
+ found = True
if not self.scheduled_daemon_actions[host]:
del self.scheduled_daemon_actions[host]
+ return found
def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]:
assert not daemon.startswith('ha-rgw.')
return self.scheduled_daemon_actions.get(host, {}).get(daemon)
+class AgentCache():
+ """
+ AgentCache is used for storing metadata about agent daemons that must be kept
+ through MGR failovers
+ """
+
+ def __init__(self, mgr):
+ # type: (CephadmOrchestrator) -> None
+ self.mgr: CephadmOrchestrator = mgr
+ self.agent_config_deps = {} # type: Dict[str, Dict[str,Any]]
+ self.agent_counter = {} # type: Dict[str, int]
+ self.agent_timestamp = {} # type: Dict[str, datetime.datetime]
+ self.agent_keys = {} # type: Dict[str, str]
+ self.agent_ports = {} # type: Dict[str, int]
+ self.sending_agent_message = {} # type: Dict[str, bool]
+
+ def load(self):
+ # type: () -> None
+ for k, v in self.mgr.get_store_prefix(AGENT_CACHE_PREFIX).items():
+ host = k[len(AGENT_CACHE_PREFIX):]
+ if host not in self.mgr.inventory:
+ self.mgr.log.warning('removing stray AgentCache record for agent on %s' % (
+ host))
+ self.mgr.set_store(k, None)
+ try:
+ j = json.loads(v)
+ self.agent_config_deps[host] = {}
+ conf_deps = j.get('agent_config_deps', {})
+ if conf_deps:
+ conf_deps['last_config'] = str_to_datetime(conf_deps['last_config'])
+ self.agent_config_deps[host] = conf_deps
+ self.agent_counter[host] = int(j.get('agent_counter', 1))
+ self.agent_timestamp[host] = str_to_datetime(
+ j.get('agent_timestamp', datetime_to_str(datetime_now())))
+ self.agent_keys[host] = str(j.get('agent_keys', ''))
+ agent_port = int(j.get('agent_ports', 0))
+ if agent_port:
+ self.agent_ports[host] = agent_port
+
+ except Exception as e:
+ self.mgr.log.warning('unable to load cached state for agent on host %s: %s' % (
+ host, e))
+ pass
+
+ def save_agent(self, host: str) -> None:
+ j: Dict[str, Any] = {}
+ if host in self.agent_config_deps:
+ j['agent_config_deps'] = {
+ 'deps': self.agent_config_deps[host].get('deps', []),
+ 'last_config': datetime_to_str(self.agent_config_deps[host]['last_config']),
+ }
+ if host in self.agent_counter:
+ j['agent_counter'] = self.agent_counter[host]
+ if host in self.agent_keys:
+ j['agent_keys'] = self.agent_keys[host]
+ if host in self.agent_ports:
+ j['agent_ports'] = self.agent_ports[host]
+ if host in self.agent_timestamp:
+ j['agent_timestamp'] = datetime_to_str(self.agent_timestamp[host])
+
+ self.mgr.set_store(AGENT_CACHE_PREFIX + host, json.dumps(j))
+
+ def update_agent_config_deps(self, host: str, deps: List[str], stamp: datetime.datetime) -> None:
+ self.agent_config_deps[host] = {
+ 'deps': deps,
+ 'last_config': stamp,
+ }
+
+ def get_agent_last_config_deps(self, host: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
+ if host in self.agent_config_deps:
+ return self.agent_config_deps[host].get('deps', []), \
+ self.agent_config_deps[host].get('last_config', None)
+ return None, None
+
+ def messaging_agent(self, host: str) -> bool:
+ if host not in self.sending_agent_message or not self.sending_agent_message[host]:
+ return False
+ return True
+
+ def agent_config_successfully_delivered(self, daemon_spec: CephadmDaemonDeploySpec) -> None:
+ # agent successfully received new config. Update config/deps
+ assert daemon_spec.service_name == 'agent'
+ self.update_agent_config_deps(
+ daemon_spec.host, daemon_spec.deps, datetime_now())
+ self.agent_timestamp[daemon_spec.host] = datetime_now()
+ self.agent_counter[daemon_spec.host] = 1
+ self.save_agent(daemon_spec.host)
+
+
class EventStore():
def __init__(self, mgr):
# type: (CephadmOrchestrator) -> None