]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/cephadm/inventory.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / cephadm / inventory.py
index 5c69e1fc09a477d39e1d4ccf42da2a400c857a23..dbdedd477780c5a607c06ae2ab2a0c73fe68d368 100644 (file)
@@ -12,6 +12,7 @@ from ceph.deployment import inventory
 from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
 from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
 from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types
+from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
 
 from .utils import resolve_ip
 from .migrations import queue_migrate_nfs_spec
@@ -24,6 +25,7 @@ logger = logging.getLogger(__name__)
 
 HOST_CACHE_PREFIX = "host."
 SPEC_STORE_PREFIX = "spec."
+AGENT_CACHE_PREFIX = 'agent.'
 
 
 class Inventory:
@@ -142,14 +144,6 @@ class Inventory:
         self.assert_host(host)
         return self._inventory[host].get('addr', host)
 
-    def filter_by_label(self, label: Optional[str] = '', as_hostspec: bool = False) -> Iterator:
-        for h, hostspec in self._inventory.items():
-            if not label or label in hostspec.get('labels', []):
-                if as_hostspec:
-                    yield self.spec_from_dict(hostspec)
-                else:
-                    yield h
-
     def spec_from_dict(self, info: dict) -> HostSpec:
         hostname = info['hostname']
         return HostSpec(
@@ -446,10 +440,12 @@ class HostCache():
         self.osdspec_previews = {}     # type: Dict[str, List[Dict[str, Any]]]
         self.osdspec_last_applied = {}  # type: Dict[str, Dict[str, datetime.datetime]]
         self.networks = {}             # type: Dict[str, Dict[str, Dict[str, List[str]]]]
+        self.last_network_update = {}   # type: Dict[str, datetime.datetime]
         self.last_device_update = {}   # type: Dict[str, datetime.datetime]
         self.last_device_change = {}   # type: Dict[str, datetime.datetime]
         self.daemon_refresh_queue = []  # type: List[str]
         self.device_refresh_queue = []  # type: List[str]
+        self.network_refresh_queue = []  # type: List[str]
         self.osdspec_previews_refresh_queue = []  # type: List[str]
 
         # host -> daemon name -> dict
@@ -461,6 +457,8 @@ class HostCache():
 
         self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {}
 
+        self.metadata_up_to_date = {}  # type: Dict[str, bool]
+
     def load(self):
         # type: () -> None
         for k, v in self.mgr.get_store_prefix(HOST_CACHE_PREFIX).items():
@@ -480,6 +478,7 @@ class HostCache():
                 # for services, we ignore the persisted last_*_update
                 # and always trigger a new scrape on mgr restart.
                 self.daemon_refresh_queue.append(host)
+                self.network_refresh_queue.append(host)
                 self.daemons[host] = {}
                 self.osdspec_previews[host] = []
                 self.osdspec_last_applied[host] = {}
@@ -506,6 +505,7 @@ class HostCache():
                     self.last_host_check[host] = str_to_datetime(j['last_host_check'])
                 self.registry_login_queue.add(host)
                 self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {})
+                self.metadata_up_to_date[host] = j.get('metadata_up_to_date', False)
 
                 self.mgr.log.debug(
                     'HostCache.load: host %s has %d daemons, '
@@ -545,11 +545,10 @@ class HostCache():
             return True
         return False
 
-    def update_host_devices_networks(
+    def update_host_devices(
             self,
             host: str,
             dls: List[inventory.Device],
-            nets: Dict[str, Dict[str, List[str]]]
     ) -> None:
         if (
                 host not in self.devices
@@ -559,7 +558,14 @@ class HostCache():
             self.last_device_change[host] = datetime_now()
         self.last_device_update[host] = datetime_now()
         self.devices[host] = dls
+
+    def update_host_networks(
+            self,
+            host: str,
+            nets: Dict[str, Dict[str, List[str]]]
+    ) -> None:
         self.networks[host] = nets
+        self.last_network_update[host] = datetime_now()
 
     def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None:
         self.daemon_config_deps[host][name] = {
@@ -606,6 +612,7 @@ class HostCache():
         self.daemon_config_deps[host] = {}
         self.daemon_refresh_queue.append(host)
         self.device_refresh_queue.append(host)
+        self.network_refresh_queue.append(host)
         self.osdspec_previews_refresh_queue.append(host)
         self.registry_login_queue.add(host)
         self.last_client_files[host] = {}
@@ -635,6 +642,13 @@ class HostCache():
             del self.last_device_update[host]
         self.mgr.event.set()
 
+    def invalidate_host_networks(self, host):
+        # type: (str) -> None
+        self.network_refresh_queue.append(host)
+        if host in self.last_network_update:
+            del self.last_network_update[host]
+        self.mgr.event.set()
+
     def distribute_new_registry_login_info(self) -> None:
         self.registry_login_queue = set(self.mgr.inventory.keys())
 
@@ -650,6 +664,8 @@ class HostCache():
             j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host])
         if host in self.last_device_update:
             j['last_device_update'] = datetime_to_str(self.last_device_update[host])
+        if host in self.last_network_update:
+            j['last_network_update'] = datetime_to_str(self.last_network_update[host])
         if host in self.last_device_change:
             j['last_device_change'] = datetime_to_str(self.last_device_change[host])
         if host in self.daemons:
@@ -679,6 +695,8 @@ class HostCache():
             j['last_client_files'] = self.last_client_files[host]
         if host in self.scheduled_daemon_actions:
             j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
+        if host in self.metadata_up_to_date:
+            j['metadata_up_to_date'] = self.metadata_up_to_date[host]
 
         self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
 
@@ -706,6 +724,8 @@ class HostCache():
             del self.last_daemon_update[host]
         if host in self.last_device_update:
             del self.last_device_update[host]
+        if host in self.last_network_update:
+            del self.last_network_update[host]
         if host in self.last_device_change:
             del self.last_device_change[host]
         if host in self.daemon_config_deps:
@@ -718,36 +738,87 @@ class HostCache():
 
     def get_hosts(self):
         # type: () -> List[str]
-        r = []
-        for host, di in self.daemons.items():
-            r.append(host)
-        return r
+        return list(self.daemons)
+
+    def get_schedulable_hosts(self) -> List[HostSpec]:
+        """
+        Returns all usable hosts that went through _refresh_host_daemons().
+
+        This mitigates a potential race, where new host was added *after*
+        ``_refresh_host_daemons()`` was called, but *before*
+        ``_apply_all_specs()`` was called. thus we end up with a hosts
+        where daemons might be running, but we have not yet detected them.
+        """
+        return [
+            h for h in self.mgr.inventory.all_specs()
+            if (
+                self.host_had_daemon_refresh(h.hostname)
+                and '_no_schedule' not in h.labels
+            )
+        ]
+
+    def get_non_draining_hosts(self) -> List[HostSpec]:
+        """
+        Returns all hosts that do not have _no_schedule label.
+
+        Useful for the agent who needs this specific list rather than the
+        schedulable_hosts since the agent needs to be deployed on hosts with
+        no daemon refresh
+        """
+        return [
+            h for h in self.mgr.inventory.all_specs() if '_no_schedule' not in h.labels
+        ]
+
+    def get_unreachable_hosts(self) -> List[HostSpec]:
+        """
+        Return all hosts that are offline or in maintenance mode.
+
+        The idea is we should not touch the daemons on these hosts (since
+        in theory the hosts are inaccessible so we CAN'T touch them) but
+        we still want to count daemons that exist on these hosts toward the
+        placement so daemons on these hosts aren't just moved elsewhere
+        """
+        return [
+            h for h in self.mgr.inventory.all_specs()
+            if (
+                h.status.lower() in ['maintenance', 'offline']
+                or h.hostname in self.mgr.offline_hosts
+            )
+        ]
 
     def get_facts(self, host: str) -> Dict[str, Any]:
         return self.facts.get(host, {})
 
+    def _get_daemons(self) -> Iterator[orchestrator.DaemonDescription]:
+        for dm in self.daemons.copy().values():
+            yield from dm.values()
+
     def get_daemons(self):
         # type: () -> List[orchestrator.DaemonDescription]
+        return list(self._get_daemons())
+
+    def get_error_daemons(self) -> List[orchestrator.DaemonDescription]:
         r = []
-        for host, dm in self.daemons.items():
-            for name, dd in dm.items():
+        for dd in self._get_daemons():
+            if dd.status is not None and dd.status == orchestrator.DaemonDescriptionStatus.error:
                 r.append(dd)
         return r
 
     def get_daemons_by_host(self, host: str) -> List[orchestrator.DaemonDescription]:
         return list(self.daemons.get(host, {}).values())
 
-    def get_daemon(self, daemon_name: str) -> orchestrator.DaemonDescription:
+    def get_daemon(self, daemon_name: str, host: Optional[str] = None) -> orchestrator.DaemonDescription:
         assert not daemon_name.startswith('ha-rgw.')
-        for _, dm in self.daemons.items():
-            for _, dd in dm.items():
-                if dd.name() == daemon_name:
-                    return dd
+        dds = self.get_daemons_by_host(host) if host else self._get_daemons()
+        for dd in dds:
+            if dd.name() == daemon_name:
+                return dd
+
         raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
 
-    def has_daemon(self, daemon_name: str) -> bool:
+    def has_daemon(self, daemon_name: str, host: Optional[str] = None) -> bool:
         try:
-            self.get_daemon(daemon_name)
+            self.get_daemon(daemon_name, host)
         except orchestrator.OrchestratorError:
             return False
         return True
@@ -762,11 +833,10 @@ class HostCache():
                 # We do not refresh daemons on hosts in maintenance mode, so stored daemon statuses
                 # could be wrong. We must assume maintenance is working and daemons are stopped
                 dd.status = orchestrator.DaemonDescriptionStatus.stopped
-                dd.status_desc = 'stopped'
             dd.events = self.mgr.events.get_for_daemon(dd.name())
             return dd
 
-        for host, dm in self.daemons.items():
+        for host, dm in self.daemons.copy().items():
             yield host, {name: alter(host, d) for name, d in dm.items()}
 
     def get_daemons_by_service(self, service_name):
@@ -774,39 +844,22 @@ class HostCache():
         assert not service_name.startswith('keepalived.')
         assert not service_name.startswith('haproxy.')
 
-        result = []   # type: List[orchestrator.DaemonDescription]
-        for host, dm in self.daemons.items():
-            for name, d in dm.items():
-                if d.service_name() == service_name:
-                    result.append(d)
-        return result
+        return list(dd for dd in self._get_daemons() if dd.service_name() == service_name)
 
-    def get_daemons_by_type(self, service_type):
-        # type: (str) -> List[orchestrator.DaemonDescription]
+    def get_daemons_by_type(self, service_type: str, host: str = '') -> List[orchestrator.DaemonDescription]:
         assert service_type not in ['keepalived', 'haproxy']
 
-        result = []   # type: List[orchestrator.DaemonDescription]
-        for host, dm in self.daemons.items():
-            for name, d in dm.items():
-                if d.daemon_type in service_to_daemon_types(service_type):
-                    result.append(d)
-        return result
+        daemons = self.daemons[host].values() if host else self._get_daemons()
 
-    def get_daemon_types(self, hostname: str) -> List[str]:
+        return [d for d in daemons if d.daemon_type in service_to_daemon_types(service_type)]
+
+    def get_daemon_types(self, hostname: str) -> Set[str]:
         """Provide a list of the types of daemons on the host"""
-        result = set()
-        for _d, dm in self.daemons[hostname].items():
-            assert dm.daemon_type is not None, f'no daemon type for {dm!r}'
-            result.add(dm.daemon_type)
-        return list(result)
+        return cast(Set[str], {d.daemon_type for d in self.daemons[hostname].values()})
 
     def get_daemon_names(self):
         # type: () -> List[str]
-        r = []
-        for host, dm in self.daemons.items():
-            for name, dd in dm.items():
-                r.append(name)
-        return r
+        return [d.name() for d in self._get_daemons()]
 
     def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
         if host in self.daemon_config_deps:
@@ -830,6 +883,8 @@ class HostCache():
             seconds=self.mgr.daemon_cache_timeout)
         if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
             return True
+        if not self.mgr.cache.host_metadata_up_to_date(host):
+            return True
         return False
 
     def host_needs_facts_refresh(self, host):
@@ -841,6 +896,8 @@ class HostCache():
             seconds=self.mgr.facts_cache_timeout)
         if host not in self.last_facts_update or self.last_facts_update[host] < cutoff:
             return True
+        if not self.mgr.cache.host_metadata_up_to_date(host):
+            return True
         return False
 
     def host_needs_autotune_memory(self, host):
@@ -876,6 +933,24 @@ class HostCache():
             seconds=self.mgr.device_cache_timeout)
         if host not in self.last_device_update or self.last_device_update[host] < cutoff:
             return True
+        if not self.mgr.cache.host_metadata_up_to_date(host):
+            return True
+        return False
+
+    def host_needs_network_refresh(self, host):
+        # type: (str) -> bool
+        if host in self.mgr.offline_hosts:
+            logger.debug(f'Host "{host}" marked as offline. Skipping network refresh')
+            return False
+        if host in self.network_refresh_queue:
+            self.network_refresh_queue.remove(host)
+            return True
+        cutoff = datetime_now() - datetime.timedelta(
+            seconds=self.mgr.device_cache_timeout)
+        if host not in self.last_network_update or self.last_network_update[host] < cutoff:
+            return True
+        if not self.mgr.cache.host_metadata_up_to_date(host):
+            return True
         return False
 
     def host_needs_osdspec_preview_refresh(self, host: str) -> bool:
@@ -917,6 +992,21 @@ class HostCache():
             return True
         return False
 
+    def host_metadata_up_to_date(self, host: str) -> bool:
+        if host not in self.metadata_up_to_date or not self.metadata_up_to_date[host]:
+            return False
+        return True
+
+    def all_host_metadata_up_to_date(self) -> bool:
+        unreachables = [h.hostname for h in self.get_unreachable_hosts()]
+        if [h for h in self.get_hosts() if (not self.host_metadata_up_to_date(h) and h not in unreachables)]:
+            # this function is primarily for telling if it's safe to try and apply a service
+            # spec. Since offline/maintenance hosts aren't considered in that process anyway
+            # we don't want to return False if the host without up-to-date metadata is in one
+            # of those two categories.
+            return False
+        return True
+
     def add_daemon(self, host, dd):
         # type: (str, orchestrator.DaemonDescription) -> None
         assert host in self.daemons
@@ -960,12 +1050,15 @@ class HostCache():
             self.scheduled_daemon_actions[host] = {}
         self.scheduled_daemon_actions[host][daemon_name] = action
 
-    def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> None:
+    def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> bool:
+        found = False
         if host in self.scheduled_daemon_actions:
             if daemon_name in self.scheduled_daemon_actions[host]:
                 del self.scheduled_daemon_actions[host][daemon_name]
+                found = True
             if not self.scheduled_daemon_actions[host]:
                 del self.scheduled_daemon_actions[host]
+        return found
 
     def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]:
         assert not daemon.startswith('ha-rgw.')
@@ -973,6 +1066,95 @@ class HostCache():
         return self.scheduled_daemon_actions.get(host, {}).get(daemon)
 
 
+class AgentCache():
+    """
+    AgentCache is used for storing metadata about agent daemons that must be kept
+    through MGR failovers
+    """
+
+    def __init__(self, mgr):
+        # type: (CephadmOrchestrator) -> None
+        self.mgr: CephadmOrchestrator = mgr
+        self.agent_config_deps = {}   # type: Dict[str, Dict[str,Any]]
+        self.agent_counter = {}  # type: Dict[str, int]
+        self.agent_timestamp = {}  # type: Dict[str, datetime.datetime]
+        self.agent_keys = {}  # type: Dict[str, str]
+        self.agent_ports = {}  # type: Dict[str, int]
+        self.sending_agent_message = {}  # type: Dict[str, bool]
+
+    def load(self):
+        # type: () -> None
+        for k, v in self.mgr.get_store_prefix(AGENT_CACHE_PREFIX).items():
+            host = k[len(AGENT_CACHE_PREFIX):]
+            if host not in self.mgr.inventory:
+                self.mgr.log.warning('removing stray AgentCache record for agent on %s' % (
+                    host))
+                self.mgr.set_store(k, None)
+            try:
+                j = json.loads(v)
+                self.agent_config_deps[host] = {}
+                conf_deps = j.get('agent_config_deps', {})
+                if conf_deps:
+                    conf_deps['last_config'] = str_to_datetime(conf_deps['last_config'])
+                self.agent_config_deps[host] = conf_deps
+                self.agent_counter[host] = int(j.get('agent_counter', 1))
+                self.agent_timestamp[host] = str_to_datetime(
+                    j.get('agent_timestamp', datetime_to_str(datetime_now())))
+                self.agent_keys[host] = str(j.get('agent_keys', ''))
+                agent_port = int(j.get('agent_ports', 0))
+                if agent_port:
+                    self.agent_ports[host] = agent_port
+
+            except Exception as e:
+                self.mgr.log.warning('unable to load cached state for agent on host %s: %s' % (
+                    host, e))
+                pass
+
+    def save_agent(self, host: str) -> None:
+        j: Dict[str, Any] = {}
+        if host in self.agent_config_deps:
+            j['agent_config_deps'] = {
+                'deps': self.agent_config_deps[host].get('deps', []),
+                'last_config': datetime_to_str(self.agent_config_deps[host]['last_config']),
+            }
+        if host in self.agent_counter:
+            j['agent_counter'] = self.agent_counter[host]
+        if host in self.agent_keys:
+            j['agent_keys'] = self.agent_keys[host]
+        if host in self.agent_ports:
+            j['agent_ports'] = self.agent_ports[host]
+        if host in self.agent_timestamp:
+            j['agent_timestamp'] = datetime_to_str(self.agent_timestamp[host])
+
+        self.mgr.set_store(AGENT_CACHE_PREFIX + host, json.dumps(j))
+
+    def update_agent_config_deps(self, host: str, deps: List[str], stamp: datetime.datetime) -> None:
+        self.agent_config_deps[host] = {
+            'deps': deps,
+            'last_config': stamp,
+        }
+
+    def get_agent_last_config_deps(self, host: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
+        if host in self.agent_config_deps:
+            return self.agent_config_deps[host].get('deps', []), \
+                self.agent_config_deps[host].get('last_config', None)
+        return None, None
+
+    def messaging_agent(self, host: str) -> bool:
+        if host not in self.sending_agent_message or not self.sending_agent_message[host]:
+            return False
+        return True
+
+    def agent_config_successfully_delivered(self, daemon_spec: CephadmDaemonDeploySpec) -> None:
+        # agent successfully received new config. Update config/deps
+        assert daemon_spec.service_name == 'agent'
+        self.update_agent_config_deps(
+            daemon_spec.host, daemon_spec.deps, datetime_now())
+        self.agent_timestamp[daemon_spec.host] = datetime_now()
+        self.agent_counter[daemon_spec.host] = 1
+        self.save_agent(daemon_spec.host)
+
+
 class EventStore():
     def __init__(self, mgr):
         # type: (CephadmOrchestrator) -> None