]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/ssh/module.py
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / pybind / mgr / ssh / module.py
index b6ebad6e5ed7aedfb12549421c4214d8e1dd7c4b..73675e520572ba91f80d480c9301becbaec48a58 100644 (file)
@@ -1,8 +1,10 @@
 import json
 import errno
+import logging
+from functools import wraps
+
 import six
 import os
-import datetime
 import tempfile
 import multiprocessing.pool
 
@@ -18,13 +20,13 @@ except ImportError as e:
     remoto = None
     remoto_import_error = str(e)
 
-DATEFMT = '%Y-%m-%d %H:%M:%S.%f'
+logger = logging.getLogger(__name__)
 
 # high-level TODO:
 #  - bring over some of the protections from ceph-deploy that guard against
 #    multiple bootstrapping / initialization
 
-class SSHReadCompletion(orchestrator.ReadCompletion):
+class SSHCompletionmMixin(object):
     def __init__(self, result):
         if isinstance(result, multiprocessing.pool.AsyncResult):
             self._result = [result]
@@ -36,34 +38,13 @@ class SSHReadCompletion(orchestrator.ReadCompletion):
     def result(self):
         return list(map(lambda r: r.get(), self._result))
 
+class SSHReadCompletion(SSHCompletionmMixin, orchestrator.ReadCompletion):
     @property
     def is_complete(self):
         return all(map(lambda r: r.ready(), self._result))
 
-class SSHReadCompletionReady(SSHReadCompletion):
-    def __init__(self, result):
-        self._result = result
-
-    @property
-    def result(self):
-        return self._result
-
-    @property
-    def is_complete(self):
-        return True
 
-class SSHWriteCompletion(orchestrator.WriteCompletion):
-    def __init__(self, result):
-        super(SSHWriteCompletion, self).__init__()
-        if isinstance(result, multiprocessing.pool.AsyncResult):
-            self._result = [result]
-        else:
-            self._result = result
-        assert isinstance(self._result, list)
-
-    @property
-    def result(self):
-        return list(map(lambda r: r.get(), self._result))
+class SSHWriteCompletion(SSHCompletionmMixin, orchestrator.WriteCompletion):
 
     @property
     def is_persistent(self):
@@ -115,6 +96,23 @@ class SSHConnection(object):
     def __getattr__(self, name):
         return getattr(self.conn, name)
 
+
+def log_exceptions(f):
+    if six.PY3:
+        return f
+    else:
+        # Python 2 does no exception chaining, thus the
+        # real exception is lost
+        @wraps(f)
+        def wrapper(*args, **kwargs):
+            try:
+                return f(*args, **kwargs)
+            except Exception:
+                logger.exception('something went wrong.')
+                raise
+        return wrapper
+
+
 class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
 
     _STORE_HOST_PREFIX = "host"
@@ -143,6 +141,14 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
         self._cluster_fsid = None
         self._worker_pool = multiprocessing.pool.ThreadPool(1)
 
+        # the keys in inventory_cache are authoritative.
+        #   You must not call remove_outdated()
+        # The values are cached by instance.
+        # cache is invalidated by
+        # 1. timeout
+        # 2. refresh parameter
+        self.inventory_cache = orchestrator.OutdatablePersistentDict(self, self._STORE_HOST_PREFIX)
+
     def handle_command(self, inbuf, command):
         if command["prefix"] == "ssh set-ssh-config":
             return self._set_ssh_config(inbuf, command)
@@ -170,23 +176,17 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         complete = True
         for c in completions:
+            if c.is_complete:
+                continue
+
             if not isinstance(c, SSHReadCompletion) and \
                     not isinstance(c, SSHWriteCompletion):
                 raise TypeError("unexpected completion: {}".format(c.__class__))
 
-            if c.is_complete:
-                continue
-
             complete = False
 
         return complete
 
-    @staticmethod
-    def time_from_string(timestr):
-        # drop the 'Z' timezone indication, it's always UTC
-        timestr = timestr.rstrip('Z')
-        return datetime.datetime.strptime(timestr, DATEFMT)
-
     def _get_cluster_fsid(self):
         """
         Fetch and cache the cluster fsid.
@@ -202,12 +202,10 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
         """
         if isinstance(hosts, six.string_types):
             hosts = [hosts]
-        unregistered_hosts = []
-        for host in hosts:
-            key = self._hostname_to_store_key(host)
-            if not self.get_store(key):
-                unregistered_hosts.append(host)
+        keys = self.inventory_cache.keys()
+        unregistered_hosts = set(hosts) - keys
         if unregistered_hosts:
+            logger.warning('keys = {}'.format(keys))
             raise RuntimeError("Host(s) {} not registered".format(
                 ", ".join(map(lambda h: "'{}'".format(h),
                     unregistered_hosts))))
@@ -367,21 +365,8 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
         conn.remote_module.write_keyring(keyring_path, keyring)
         return keyring_path
 
-    def _hostname_to_store_key(self, host):
-        return "{}.{}".format(self._STORE_HOST_PREFIX, host)
-
     def _get_hosts(self, wanted=None):
-        if wanted:
-            hosts_info = []
-            for host in wanted:
-                key = self._hostname_to_store_key(host)
-                info = self.get_store(key)
-                if info:
-                    hosts_info.append((key, info))
-        else:
-            hosts_info = six.iteritems(self.get_store_prefix(self._STORE_HOST_PREFIX))
-
-        return list(map(lambda kv: (kv[0], json.loads(kv[1])), hosts_info))
+        return self.inventory_cache.items_filtered(wanted)
 
     def add_host(self, host):
         """
@@ -389,13 +374,9 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         :param host: host name
         """
+        @log_exceptions
         def run(host):
-            key = self._hostname_to_store_key(host)
-            self.set_store(key, json.dumps({
-                "host": host,
-                "inventory": None,
-                "last_inventory_refresh": None
-            }))
+            self.inventory_cache[host] = orchestrator.OutdatableData()
             return "Added host '{}'".format(host)
 
         return SSHWriteCompletion(
@@ -407,9 +388,9 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         :param host: host name
         """
+        @log_exceptions
         def run(host):
-            key = self._hostname_to_store_key(host)
-            self.set_store(key, None)
+            del self.inventory_cache[host]
             return "Removed host '{}'".format(host)
 
         return SSHWriteCompletion(
@@ -425,11 +406,8 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
         TODO:
           - InventoryNode probably needs to be able to report labels
         """
-        nodes = []
-        for key, host_info in self._get_hosts():
-            node = orchestrator.InventoryNode(host_info["host"], [])
-            nodes.append(node)
-        return SSHReadCompletionReady(nodes)
+        nodes = [orchestrator.InventoryNode(host_name, []) for host_name in self.inventory_cache]
+        return orchestrator.TrivialReadCompletion(nodes)
 
     def _get_device_inventory(self, host):
         """
@@ -475,42 +453,23 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
             # this implies the returned hosts are registered
             hosts = self._get_hosts()
 
-        def run(key, host_info):
-            updated = False
-            host = host_info["host"]
-
-            if not host_info["inventory"]:
-                self.log.info("caching inventory for '{}'".format(host))
-                host_info["inventory"] = self._get_device_inventory(host)
-                updated = True
-            else:
-                timeout_min = int(self.get_module_option(
-                    "inventory_cache_timeout_min",
-                    self._DEFAULT_INVENTORY_CACHE_TIMEOUT_MIN))
-
-                cutoff = datetime.datetime.utcnow() - datetime.timedelta(
-                        minutes=timeout_min)
-
-                last_update = self.time_from_string(host_info["last_inventory_refresh"])
+        @log_exceptions
+        def run(host, host_info):
+            # type: (str, orchestrator.OutdatableData) -> orchestrator.InventoryNode
 
-                if last_update < cutoff or refresh:
-                    self.log.info("refresh stale inventory for '{}'".format(host))
-                    host_info["inventory"] = self._get_device_inventory(host)
-                    updated = True
-                else:
-                    self.log.info("reading cached inventory for '{}'".format(host))
-                    pass
+            timeout_min = int(self.get_module_option(
+                "inventory_cache_timeout_min",
+                self._DEFAULT_INVENTORY_CACHE_TIMEOUT_MIN))
 
-            if updated:
-                now = datetime.datetime.utcnow()
-                now = now.strftime(DATEFMT)
-                host_info["last_inventory_refresh"] = now
-                self.set_store(key, json.dumps(host_info))
-
-            devices = list(map(lambda di:
-                orchestrator.InventoryDevice.from_ceph_volume_inventory(di),
-                host_info["inventory"]))
+            if host_info.outdated(timeout_min) or refresh:
+                self.log.info("refresh stale inventory for '{}'".format(host))
+                data = self._get_device_inventory(host)
+                host_info = orchestrator.OutdatableData(data)
+                self.inventory_cache[host] = host_info
+            else:
+                self.log.debug("reading cached inventory for '{}'".format(host))
 
+            devices = orchestrator.InventoryDevice.from_ceph_volume_inventory_list(host_info.data)
             return orchestrator.InventoryNode(host, devices)
 
         results = []
@@ -520,6 +479,7 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         return SSHReadCompletion(results)
 
+    @log_exceptions
     def _create_osd(self, host, drive_group):
         conn = self._get_connection(host)
         try:
@@ -760,7 +720,6 @@ class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
             raise NotImplementedError("Removing managers is not supported")
 
         # check that all the hosts are registered
-        hosts = list(set(hosts))
         self._require_hosts(hosts)
 
         # we assume explicit placement by which there are the same number of