]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/cephadm/inventory.py
import 15.2.5
[ceph.git] / ceph / src / pybind / mgr / cephadm / inventory.py
index e107ee4abb46586fe1b248c67941a315e323b766..3780e749f72d618209cc32bbb9545868dc5acfee 100644 (file)
@@ -9,7 +9,7 @@ import six
 import orchestrator
 from ceph.deployment import inventory
 from ceph.deployment.service_spec import ServiceSpec
-from orchestrator import OrchestratorError, HostSpec
+from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent
 
 if TYPE_CHECKING:
     from .module import CephadmOrchestrator
@@ -83,8 +83,9 @@ class Inventory:
         for h, hostspec in self._inventory.items():
             if not label or label in hostspec.get('labels', []):
                 if as_hostspec:
-                    yield hostspec
-                yield h
+                    yield self.spec_from_dict(hostspec)
+                else:
+                    yield h
 
     def spec_from_dict(self, info):
         hostname = info['hostname']
@@ -108,6 +109,7 @@ class SpecStore():
         self.mgr = mgr
         self.specs = {} # type: Dict[str, ServiceSpec]
         self.spec_created = {} # type: Dict[str, datetime.datetime]
+        self.spec_preview = {} # type: Dict[str, ServiceSpec]
 
     def load(self):
         # type: () -> None
@@ -128,6 +130,9 @@ class SpecStore():
 
     def save(self, spec):
         # type: (ServiceSpec) -> None
+        if spec.preview_only:
+            self.spec_preview[spec.service_name()] = spec
+            return None
         self.specs[spec.service_name()] = spec
         self.spec_created[spec.service_name()] = datetime.datetime.utcnow()
         self.mgr.set_store(
@@ -137,6 +142,7 @@ class SpecStore():
                 'created': self.spec_created[spec.service_name()].strftime(DATEFMT),
             }, sort_keys=True),
         )
+        self.mgr.events.for_service(spec, OrchestratorEvent.INFO, 'service was created')
 
     def rm(self, service_name):
         # type: (str) -> bool
@@ -171,9 +177,13 @@ class HostCache():
         self.daemon_refresh_queue = [] # type: List[str]
         self.device_refresh_queue = [] # type: List[str]
         self.osdspec_previews_refresh_queue = [] # type: List[str]
+
+        # host -> daemon name -> dict
         self.daemon_config_deps = {}   # type: Dict[str, Dict[str, Dict[str,Any]]]
         self.last_host_check = {}      # type: Dict[str, datetime.datetime]
         self.loading_osdspec_preview = set()  # type: Set[str]
+        self.last_etc_ceph_ceph_conf: Dict[str, datetime.datetime] = {}
+        self.registry_login_queue: Set[str] = set()
 
     def load(self):
         # type: () -> None
@@ -215,6 +225,10 @@ class HostCache():
                 if 'last_host_check' in j:
                     self.last_host_check[host] = datetime.datetime.strptime(
                         j['last_host_check'], DATEFMT)
+                if 'last_etc_ceph_ceph_conf' in j:
+                    self.last_etc_ceph_ceph_conf[host] = datetime.datetime.strptime(
+                        j['last_etc_ceph_ceph_conf'], DATEFMT)
+                self.registry_login_queue.add(host)
                 self.mgr.log.debug(
                     'HostCache.load: host %s has %d daemons, '
                     '%d devices, %d networks' % (
@@ -259,6 +273,7 @@ class HostCache():
         self.daemon_refresh_queue.append(host)
         self.device_refresh_queue.append(host)
         self.osdspec_previews_refresh_queue.append(host)
+        self.registry_login_queue.add(host)
 
     def invalidate_host_daemons(self, host):
         # type: (str) -> None
@@ -273,6 +288,9 @@ class HostCache():
         if host in self.last_device_update:
             del self.last_device_update[host]
         self.mgr.event.set()
+    
+    def distribute_new_registry_login_info(self):
+        self.registry_login_queue = set(self.mgr.inventory.keys())
 
     def save_host(self, host):
         # type: (str) -> None
@@ -301,6 +319,10 @@ class HostCache():
 
         if host in self.last_host_check:
             j['last_host_check'] = self.last_host_check[host].strftime(DATEFMT)
+
+        if host in self.last_etc_ceph_ceph_conf:
+            j['last_etc_ceph_ceph_conf'] = self.last_etc_ceph_ceph_conf[host].strftime(DATEFMT)
+
         self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
 
     def rm_host(self, host):
@@ -338,24 +360,40 @@ class HostCache():
                 r.append(dd)
         return r
 
+    def get_daemon(self, daemon_name: str) -> orchestrator.DaemonDescription:
+        for _, dm in self.daemons.items():
+            for _, dd in dm.items():
+                if dd.name() == daemon_name:
+                    return dd
+        raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
+
     def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
-        for host, dm in self.daemons.items():
+        def alter(host, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
+            dd = copy(dd_orig)
             if host in self.mgr.offline_hosts:
-                def set_offline(dd: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
-                    ret = copy(dd)
-                    ret.status = -1
-                    ret.status_desc = 'host is offline'
-                    return ret
-                yield host, {name: set_offline(d) for name, d in dm.items()}
-            else:
-                yield host, dm
+                dd.status = -1
+                dd.status_desc = 'host is offline'
+            dd.events = self.mgr.events.get_for_daemon(dd.name())
+            return dd
+
+        for host, dm in self.daemons.items():
+            yield host, {name: alter(host, d) for name, d in dm.items()}
 
     def get_daemons_by_service(self, service_name):
         # type: (str) -> List[orchestrator.DaemonDescription]
         result = []   # type: List[orchestrator.DaemonDescription]
         for host, dm in self.daemons.items():
             for name, d in dm.items():
-                if name.startswith(service_name + '.'):
+                if d.service_name() == service_name:
+                    result.append(d)
+        return result
+
+    def get_daemons_by_type(self, service_type):
+        # type: (str) -> List[orchestrator.DaemonDescription]
+        result = []   # type: List[orchestrator.DaemonDescription]
+        for host, dm in self.daemons.items():
+            for name, d in dm.items():
+                if d.daemon_type == service_type:
                     result.append(d)
         return result
 
@@ -419,6 +457,35 @@ class HostCache():
             seconds=self.mgr.host_check_interval)
         return host not in self.last_host_check or self.last_host_check[host] < cutoff
 
+    def host_needs_new_etc_ceph_ceph_conf(self, host: str):
+        if not self.mgr.manage_etc_ceph_ceph_conf:
+            return False
+        if self.mgr.paused:
+            return False
+        if host in self.mgr.offline_hosts:
+            return False
+        if not self.mgr.last_monmap:
+            return False
+        if host not in self.last_etc_ceph_ceph_conf:
+            return True
+        if self.mgr.last_monmap > self.last_etc_ceph_ceph_conf[host]:
+            return True
+        # already up to date:
+        return False
+    
+    def update_last_etc_ceph_ceph_conf(self, host: str):
+        if not self.mgr.last_monmap:
+            return
+        self.last_etc_ceph_ceph_conf[host] = self.mgr.last_monmap
+
+    def host_needs_registry_login(self, host):
+        if host in self.mgr.offline_hosts:
+            return False
+        if host in self.registry_login_queue:
+            self.registry_login_queue.remove(host)
+            return True
+        return False
+
     def add_daemon(self, host, dd):
         # type: (str, orchestrator.DaemonDescription) -> None
         assert host in self.daemons
@@ -427,4 +494,86 @@ class HostCache():
     def rm_daemon(self, host, name):
         if host in self.daemons:
             if name in self.daemons[host]:
-                del self.daemons[host][name]
\ No newline at end of file
+                del self.daemons[host][name]
+
+    def daemon_cache_filled(self):
+        """
+        i.e. we have checked the daemons for each hosts at least once.
+        excluding offline hosts.
+
+        We're not checking for `host_needs_daemon_refresh`, as this might never be
+        False for all hosts.
+        """
+        return all((h in self.last_daemon_update or h in self.mgr.offline_hosts)
+                   for h in self.get_hosts())
+
+
+class EventStore():
+    def __init__(self, mgr):
+        # type: (CephadmOrchestrator) -> None
+        self.mgr: CephadmOrchestrator = mgr
+        self.events = {} # type: Dict[str, List[OrchestratorEvent]]
+
+    def add(self, event: OrchestratorEvent) -> None:
+
+        if event.kind_subject() not in self.events:
+            self.events[event.kind_subject()] = [event]
+
+        for e in self.events[event.kind_subject()]:
+            if e.message == event.message:
+                return
+
+        self.events[event.kind_subject()].append(event)
+
+        # limit to five events for now.
+        self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:]
+
+    def for_service(self, spec: ServiceSpec, level, message) -> None:
+        e = OrchestratorEvent(datetime.datetime.utcnow(), 'service', spec.service_name(), level, message)
+        self.add(e)
+
+    def from_orch_error(self, e: OrchestratorError):
+        if e.event_subject is not None:
+            self.add(OrchestratorEvent(
+                datetime.datetime.utcnow(),
+                e.event_subject[0],
+                e.event_subject[1],
+                "ERROR",
+                str(e)
+            ))
+
+
+    def for_daemon(self, daemon_name, level, message):
+        e = OrchestratorEvent(datetime.datetime.utcnow(), 'daemon', daemon_name, level, message)
+        self.add(e)
+
+    def for_daemon_from_exception(self, daemon_name, e: Exception):
+        self.for_daemon(
+            daemon_name,
+            "ERROR",
+            str(e)
+        )
+
+    def cleanup(self) -> None:
+        # Needs to be properly done, in case events are persistently stored.
+
+        unknowns: List[str] = []
+        daemons = self.mgr.cache.get_daemon_names()
+        specs = self.mgr.spec_store.specs.keys()
+        for k_s, v in self.events.items():
+            kind, subject = k_s.split(':')
+            if kind == 'service':
+                if subject not in specs:
+                    unknowns.append(k_s)
+            elif kind == 'daemon':
+                if subject not in daemons:
+                    unknowns.append(k_s)
+
+        for k_s in unknowns:
+            del self.events[k_s]
+
+    def get_for_service(self, name) -> List[OrchestratorEvent]:
+        return self.events.get('service:' + name, [])
+
+    def get_for_daemon(self, name) -> List[OrchestratorEvent]:
+        return self.events.get('daemon:' + name, [])