]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/cephadm/inventory.py
import 15.2.9
[ceph.git] / ceph / src / pybind / mgr / cephadm / inventory.py
index 9e9345fe2e313d752fdc7b6732812e1beb8c71a5..1629738444b834dda71583c70fa7cab251a34a19 100644 (file)
@@ -9,7 +9,7 @@ import six
 import orchestrator
 from ceph.deployment import inventory
 from ceph.deployment.service_spec import ServiceSpec
-from cephadm.utils import str_to_datetime, datetime_to_str
+from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
 from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent
 
 if TYPE_CHECKING:
@@ -23,12 +23,20 @@ SPEC_STORE_PREFIX = "spec."
 
 
 class Inventory:
+    """
+    The inventory stores a HostSpec for all hosts persistently.
+    """
+
     def __init__(self, mgr: 'CephadmOrchestrator'):
         self.mgr = mgr
         # load inventory
         i = self.mgr.get_store('inventory')
         if i:
             self._inventory: Dict[str, dict] = json.loads(i)
+            # handle old clusters missing 'hostname' key from hostspec
+            for k, v in self._inventory.items():
+                if 'hostname' not in v:
+                    v['hostname'] = k
         else:
             self._inventory = dict()
         logger.debug('Loaded inventory %s' % self._inventory)
@@ -39,25 +47,25 @@ class Inventory:
     def __contains__(self, host: str) -> bool:
         return host in self._inventory
 
-    def assert_host(self, host):
+    def assert_host(self, host: str) -> None:
         if host not in self._inventory:
             raise OrchestratorError('host %s does not exist' % host)
 
-    def add_host(self, spec: HostSpec):
+    def add_host(self, spec: HostSpec) -> None:
         self._inventory[spec.hostname] = spec.to_json()
         self.save()
 
-    def rm_host(self, host: str):
+    def rm_host(self, host: str) -> None:
         self.assert_host(host)
         del self._inventory[host]
         self.save()
 
-    def set_addr(self, host, addr):
+    def set_addr(self, host: str, addr: str) -> None:
         self.assert_host(host)
         self._inventory[host]['addr'] = addr
         self.save()
 
-    def add_label(self, host, label):
+    def add_label(self, host: str, label: str) -> None:
         self.assert_host(host)
 
         if 'labels' not in self._inventory[host]:
@@ -66,7 +74,7 @@ class Inventory:
             self._inventory[host]['labels'].append(label)
         self.save()
 
-    def rm_label(self, host, label):
+    def rm_label(self, host: str, label: str) -> None:
         self.assert_host(host)
 
         if 'labels' not in self._inventory[host]:
@@ -75,7 +83,7 @@ class Inventory:
             self._inventory[host]['labels'].remove(label)
         self.save()
 
-    def get_addr(self, host) -> str:
+    def get_addr(self, host: str) -> str:
         self.assert_host(host)
         return self._inventory[host].get('addr', host)
 
@@ -87,7 +95,7 @@ class Inventory:
                 else:
                     yield h
 
-    def spec_from_dict(self, info) -> HostSpec:
+    def spec_from_dict(self, info: dict) -> HostSpec:
         hostname = info['hostname']
         return HostSpec(
             hostname,
@@ -99,7 +107,7 @@ class Inventory:
     def all_specs(self) -> List[HostSpec]:
         return list(map(self.spec_from_dict, self._inventory.values()))
 
-    def save(self):
+    def save(self) -> None:
         self.mgr.set_store('inventory', json.dumps(self._inventory))
 
 
@@ -134,7 +142,7 @@ class SpecStore():
             self.spec_preview[spec.service_name()] = spec
             return None
         self.specs[spec.service_name()] = spec
-        self.spec_created[spec.service_name()] = datetime.datetime.utcnow()
+        self.spec_created[spec.service_name()] = datetime_now()
         self.mgr.set_store(
             SPEC_STORE_PREFIX + spec.service_name(),
             json.dumps({
@@ -166,12 +174,44 @@ class SpecStore():
 
 
 class HostCache():
+    """
+    HostCache stores different things:
+
+    1. `daemons`: Deployed daemons O(daemons)
+
+    They're part of the configuration nowadays and need to be
+    persistent. The name "daemon cache" is unfortunately a bit misleading.
+    Like for example we really need to know where daemons are deployed on
+    hosts that are offline.
+
+    2. `devices`: ceph-volume inventory cache O(hosts)
+
+    As soon as this is populated, it becomes more or less read-only.
+
+    3. `networks`: network interfaces for each host. O(hosts)
+
+    This is needed in order to deploy MONs. As this is mostly read-only.
+
+    4. `last_etc_ceph_ceph_conf` O(hosts)
+
+    Stores the last refresh time for the /etc/ceph/ceph.conf. Used 
+    to avoid deploying new configs when failing over to a new mgr.
+
+    5. `scheduled_daemon_actions`: O(daemons)
+
+    Used to run daemon actions after deploying a daemon. We need to
+    store it persistently, in order to stay consistent across
+    MGR failovers.   
+    """
+
     def __init__(self, mgr):
         # type: (CephadmOrchestrator) -> None
         self.mgr: CephadmOrchestrator = mgr
         self.daemons = {}   # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
         self.last_daemon_update = {}   # type: Dict[str, datetime.datetime]
         self.devices = {}              # type: Dict[str, List[inventory.Device]]
+        self.facts = {}                # type: Dict[str, Dict[str, Any]]
+        self.last_facts_update = {}    # type: Dict[str, datetime.datetime]
         self.osdspec_previews = {}     # type: Dict[str, List[Dict[str, Any]]]
         self.networks = {}             # type: Dict[str, Dict[str, List[str]]]
         self.last_device_update = {}   # type: Dict[str, datetime.datetime]
@@ -244,15 +284,20 @@ class HostCache():
     def update_host_daemons(self, host, dm):
         # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
         self.daemons[host] = dm
-        self.last_daemon_update[host] = datetime.datetime.utcnow()
+        self.last_daemon_update[host] = datetime_now()
+
+    def update_host_facts(self, host, facts):
+        # type: (str, Dict[str, Dict[str, Any]]) -> None
+        self.facts[host] = facts
+        self.last_facts_update[host] = datetime.datetime.utcnow()
 
     def update_host_devices_networks(self, host, dls, nets):
         # type: (str, List[inventory.Device], Dict[str,List[str]]) -> None
         self.devices[host] = dls
         self.networks[host] = nets
-        self.last_device_update[host] = datetime.datetime.utcnow()
+        self.last_device_update[host] = datetime_now()
 
-    def update_daemon_config_deps(self, host, name, deps, stamp):
+    def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None:
         self.daemon_config_deps[host][name] = {
             'deps': deps,
             'last_config': stamp,
@@ -260,7 +305,7 @@ class HostCache():
 
     def update_last_host_check(self, host):
         # type: (str) -> None
-        self.last_host_check[host] = datetime.datetime.utcnow()
+        self.last_host_check[host] = datetime_now()
 
     def prime_empty_host(self, host):
         # type: (str) -> None
@@ -291,7 +336,7 @@ class HostCache():
             del self.last_device_update[host]
         self.mgr.event.set()
 
-    def distribute_new_registry_login_info(self):
+    def distribute_new_registry_login_info(self) -> None:
         self.registry_login_queue = set(self.mgr.inventory.keys())
 
     def save_host(self, host: str) -> None:
@@ -305,17 +350,21 @@ class HostCache():
             j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host])
         if host in self.last_device_update:
             j['last_device_update'] = datetime_to_str(self.last_device_update[host])
-        for name, dd in self.daemons[host].items():
-            j['daemons'][name] = dd.to_json()
-        for d in self.devices[host]:
-            j['devices'].append(d.to_json())
-        j['networks'] = self.networks[host]
-        for name, depi in self.daemon_config_deps[host].items():
-            j['daemon_config_deps'][name] = {
-                'deps': depi.get('deps', []),
-                'last_config': datetime_to_str(depi['last_config']),
-            }
-        if self.osdspec_previews[host]:
+        if host in self.daemons:
+            for name, dd in self.daemons[host].items():
+                j['daemons'][name] = dd.to_json()
+        if host in self.devices:
+            for d in self.devices[host]:
+                j['devices'].append(d.to_json())
+        if host in self.networks:
+            j['networks'] = self.networks[host]
+        if host in self.daemon_config_deps:
+            for name, depi in self.daemon_config_deps[host].items():
+                j['daemon_config_deps'][name] = {
+                    'deps': depi.get('deps', []),
+                    'last_config': datetime_to_str(depi['last_config']),
+                }
+        if host in self.osdspec_previews and self.osdspec_previews[host]:
             j['osdspec_previews'] = self.osdspec_previews[host]
 
         if host in self.last_host_check:
@@ -323,7 +372,7 @@ class HostCache():
 
         if host in self.last_etc_ceph_ceph_conf:
             j['last_etc_ceph_ceph_conf'] = datetime_to_str(self.last_etc_ceph_ceph_conf[host])
-        if self.scheduled_daemon_actions.get(host, {}):
+        if host in self.scheduled_daemon_actions:
             j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
 
         self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
@@ -334,6 +383,10 @@ class HostCache():
             del self.daemons[host]
         if host in self.devices:
             del self.devices[host]
+        if host in self.facts:
+            del self.facts[host]
+        if host in self.last_facts_update:
+            del self.last_facts_update[host]
         if host in self.osdspec_previews:
             del self.osdspec_previews[host]
         if host in self.loading_osdspec_preview:
@@ -373,7 +426,7 @@ class HostCache():
         raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
 
     def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
-        def alter(host, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
+        def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
             dd = copy(dd_orig)
             if host in self.mgr.offline_hosts:
                 dd.status = -1
@@ -410,7 +463,7 @@ class HostCache():
                 r.append(name)
         return r
 
-    def get_daemon_last_config_deps(self, host, name) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
+    def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
         if host in self.daemon_config_deps:
             if name in self.daemon_config_deps[host]:
                 return self.daemon_config_deps[host][name].get('deps', []), \
@@ -425,12 +478,23 @@ class HostCache():
         if host in self.daemon_refresh_queue:
             self.daemon_refresh_queue.remove(host)
             return True
-        cutoff = datetime.datetime.utcnow() - datetime.timedelta(
+        cutoff = datetime_now() - datetime.timedelta(
             seconds=self.mgr.daemon_cache_timeout)
         if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
             return True
         return False
 
+    def host_needs_facts_refresh(self, host):
+        # type: (str) -> bool
+        if host in self.mgr.offline_hosts:
+            logger.debug(f'Host "{host}" marked as offline. Skipping gather facts refresh')
+            return False
+        cutoff = datetime.datetime.utcnow() - datetime.timedelta(
+            seconds=self.mgr.facts_cache_timeout)
+        if host not in self.last_facts_update or self.last_facts_update[host] < cutoff:
+            return True
+        return False
+
     def host_had_daemon_refresh(self, host: str) -> bool:
         """
         ... at least once.
@@ -449,13 +513,13 @@ class HostCache():
         if host in self.device_refresh_queue:
             self.device_refresh_queue.remove(host)
             return True
-        cutoff = datetime.datetime.utcnow() - datetime.timedelta(
+        cutoff = datetime_now() - datetime.timedelta(
             seconds=self.mgr.device_cache_timeout)
         if host not in self.last_device_update or self.last_device_update[host] < cutoff:
             return True
         return False
 
-    def host_needs_osdspec_preview_refresh(self, host):
+    def host_needs_osdspec_preview_refresh(self, host: str) -> bool:
         if host in self.mgr.offline_hosts:
             logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh')
             return False
@@ -468,11 +532,11 @@ class HostCache():
 
     def host_needs_check(self, host):
         # type: (str) -> bool
-        cutoff = datetime.datetime.utcnow() - datetime.timedelta(
+        cutoff = datetime_now() - datetime.timedelta(
             seconds=self.mgr.host_check_interval)
         return host not in self.last_host_check or self.last_host_check[host] < cutoff
 
-    def host_needs_new_etc_ceph_ceph_conf(self, host: str):
+    def host_needs_new_etc_ceph_ceph_conf(self, host: str) -> bool:
         if not self.mgr.manage_etc_ceph_ceph_conf:
             return False
         if self.mgr.paused:
@@ -490,10 +554,10 @@ class HostCache():
         # already up to date:
         return False
 
-    def update_last_etc_ceph_ceph_conf(self, host: str):
+    def update_last_etc_ceph_ceph_conf(self, host: str) -> None:
         if not self.mgr.last_monmap:
             return
-        self.last_etc_ceph_ceph_conf[host] = datetime.datetime.utcnow()
+        self.last_etc_ceph_ceph_conf[host] = datetime_now()
 
     def host_needs_registry_login(self, host: str) -> bool:
         if host in self.mgr.offline_hosts:
@@ -508,12 +572,12 @@ class HostCache():
         assert host in self.daemons
         self.daemons[host][dd.name()] = dd
 
-    def rm_daemon(self, host, name):
+    def rm_daemon(self, host: str, name: str) -> None:
         if host in self.daemons:
             if name in self.daemons[host]:
                 del self.daemons[host][name]
 
-    def daemon_cache_filled(self):
+    def daemon_cache_filled(self) -> bool:
         """
         i.e. we have checked the daemons for each hosts at least once.
         excluding offline hosts.
@@ -524,7 +588,7 @@ class HostCache():
         return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts)
                    for h in self.get_hosts())
 
-    def schedule_daemon_action(self, host: str, daemon_name: str, action: str):
+    def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None:
         priorities = {
             'start': 1,
             'restart': 2,
@@ -542,14 +606,14 @@ class HostCache():
             self.scheduled_daemon_actions[host] = {}
         self.scheduled_daemon_actions[host][daemon_name] = action
 
-    def rm_scheduled_daemon_action(self, host: str, daemon_name: str):
+    def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> None:
         if host in self.scheduled_daemon_actions:
             if daemon_name in self.scheduled_daemon_actions[host]:
                 del self.scheduled_daemon_actions[host][daemon_name]
             if not self.scheduled_daemon_actions[host]:
                 del self.scheduled_daemon_actions[host]
 
-    def get_scheduled_daemon_action(self, host, daemon) -> Optional[str]:
+    def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]:
         return self.scheduled_daemon_actions.get(host, {}).get(daemon)
 
 
@@ -573,26 +637,26 @@ class EventStore():
         # limit to five events for now.
         self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:]
 
-    def for_service(self, spec: ServiceSpec, level, message) -> None:
-        e = OrchestratorEvent(datetime.datetime.utcnow(), 'service',
+    def for_service(self, spec: ServiceSpec, level: str, message: str) -> None:
+        e = OrchestratorEvent(datetime_now(), 'service',
                               spec.service_name(), level, message)
         self.add(e)
 
-    def from_orch_error(self, e: OrchestratorError):
+    def from_orch_error(self, e: OrchestratorError) -> None:
         if e.event_subject is not None:
             self.add(OrchestratorEvent(
-                datetime.datetime.utcnow(),
+                datetime_now(),
                 e.event_subject[0],
                 e.event_subject[1],
                 "ERROR",
                 str(e)
             ))
 
-    def for_daemon(self, daemon_name, level, message):
-        e = OrchestratorEvent(datetime.datetime.utcnow(), 'daemon', daemon_name, level, message)
+    def for_daemon(self, daemon_name: str, level: str, message: str) -> None:
+        e = OrchestratorEvent(datetime_now(), 'daemon', daemon_name, level, message)
         self.add(e)
 
-    def for_daemon_from_exception(self, daemon_name, e: Exception):
+    def for_daemon_from_exception(self, daemon_name: str, e: Exception) -> None:
         self.for_daemon(
             daemon_name,
             "ERROR",
@@ -617,8 +681,8 @@ class EventStore():
         for k_s in unknowns:
             del self.events[k_s]
 
-    def get_for_service(self, name) -> List[OrchestratorEvent]:
+    def get_for_service(self, name: str) -> List[OrchestratorEvent]:
         return self.events.get('service:' + name, [])
 
-    def get_for_daemon(self, name) -> List[OrchestratorEvent]:
+    def get_for_daemon(self, name: str) -> List[OrchestratorEvent]:
         return self.events.get('daemon:' + name, [])